summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-python3-0.18/scapy/plist.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/plist.py')
-rw-r--r--scripts/external_libs/scapy-python3-0.18/scapy/plist.py517
1 files changed, 0 insertions, 517 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/plist.py b/scripts/external_libs/scapy-python3-0.18/scapy/plist.py
deleted file mode 100644
index bdf0b757..00000000
--- a/scripts/external_libs/scapy-python3-0.18/scapy/plist.py
+++ /dev/null
@@ -1,517 +0,0 @@
-## This file is part of Scapy
-## See http://www.secdev.org/projects/scapy for more informations
-## Copyright (C) Philippe Biondi <phil@secdev.org>
-## This program is published under a GPLv2 license
-
-"""
-PacketList: holds several packets and allows to do operations on them.
-"""
-
-
-import os,subprocess
-from .config import conf
-from .base_classes import BasePacket,BasePacketList
-from collections import defaultdict
-
-from .utils import do_graph,hexdump,make_table,make_lined_table,make_tex_table,get_temp_file
-
-from scapy.arch import NETWORKX
-if NETWORKX:
- import networkx as nx
-
-
-#############
-## Results ##
-#############
-
-class PacketList(BasePacketList):
- res = []
- def __init__(self, res=None, name="PacketList", stats=None, vector_index = None):
- """create a packet list from a list of packets
- res: the list of packets
- stats: a list of classes that will appear in the stats (defaults to [TCP,UDP,ICMP])"""
- if stats is None:
- stats = conf.stats_classic_protocols
- self.stats = stats
- if res is None:
- res = []
- if isinstance(res, PacketList):
- res = res.res
- self.res = res
- self.listname = name
- self.vector_index = vector_index
- def __len__(self):
- return len(self.res)
- def _elt2pkt(self, elt):
- if self.vector_index == None:
- return elt
- else:
- return elt[self.vector_index]
- def _elt2sum(self, elt):
- if self.vector_index == None:
- return elt.summary()
- else:
- return "%s ==> %s" % (elt[0].summary(),elt[1].summary())
-
- def _elt2show(self, elt):
- return self._elt2sum(elt)
- def __repr__(self):
- stats=dict.fromkeys(self.stats,0)
- other = 0
- for r in self.res:
- f = 0
- for p in stats:
- if self._elt2pkt(r).haslayer(p):
- stats[p] += 1
- f = 1
- break
- if not f:
- other += 1
- s = ""
- ct = conf.color_theme
- for p in self.stats:
- s += " %s%s%s" % (ct.packetlist_proto(p.name),
- ct.punct(":"),
- ct.packetlist_value(stats[p]))
- s += " %s%s%s" % (ct.packetlist_proto("Other"),
- ct.punct(":"),
- ct.packetlist_value(other))
- return "%s%s%s%s%s" % (ct.punct("<"),
- ct.packetlist_name(self.listname),
- ct.punct(":"),
- s,
- ct.punct(">"))
- def __getattr__(self, attr):
- return getattr(self.res, attr)
- def __getitem__(self, item):
- if isinstance(item,type) and issubclass(item,BasePacket):
- #return self.__class__(filter(lambda x: item in self._elt2pkt(x),self.res),
- return self.__class__([ x for x in self.res if item in self._elt2pkt(x) ],
- name="%s from %s"%(item.__name__,self.listname))
- if type(item) is slice:
- return self.__class__(self.res.__getitem__(item),
- name = "mod %s" % self.listname)
- return self.res.__getitem__(item)
- def __getslice__(self, *args, **kargs):
- return self.__class__(self.res.__getslice__(*args, **kargs),
- name="mod %s"%self.listname)
- def __add__(self, other):
- return self.__class__(self.res+other.res,
- name="%s+%s"%(self.listname,other.listname))
- def summary(self, prn=None, lfilter=None):
- """prints a summary of each packet
-prn: function to apply to each packet instead of lambda x:x.summary()
-lfilter: truth function to apply to each packet to decide whether it will be displayed"""
- for r in self.res:
- if lfilter is not None:
- if not lfilter(r):
- continue
- if prn is None:
- print(self._elt2sum(r))
- else:
- print(prn(r))
- def nsummary(self,prn=None, lfilter=None):
- """prints a summary of each packet with the packet's number
-prn: function to apply to each packet instead of lambda x:x.summary()
-lfilter: truth function to apply to each packet to decide whether it will be displayed"""
- for i, p in enumerate(self.res):
- if lfilter is not None:
- if not lfilter(p):
- continue
- print(conf.color_theme.id(i,fmt="%04i"), end = " ")
- if prn is None:
- print(self._elt2sum(p))
- else:
- print(prn(p))
- def display(self): # Deprecated. Use show()
- """deprecated. is show()"""
- self.show()
- def show(self, *args, **kargs):
- """Best way to display the packet list. Defaults to nsummary() method"""
- return self.nsummary(*args, **kargs)
-
- def filter(self, func):
- """Returns a packet list filtered by a truth function"""
- return self.__class__(list(filter(func,self.res)),
- name="filtered %s"%self.listname)
-
- def plot(self, f, lfilter=None,**kargs):
- """Applies a function to each packet to get a value that will be plotted with matplotlib. A matplotlib object is returned
- lfilter: a truth function that decides whether a packet must be ploted"""
-
- return plt.plot([ f(i) for i in self.res if not lfilter or lfilter(i) ], **kargs)
-
- def diffplot(self, f, delay=1, lfilter=None, **kargs):
- """diffplot(f, delay=1, lfilter=None)
- Applies a function to couples (l[i],l[i+delay])"""
-
- return plt.plot([ f(i, j) for i in self.res[:-delay] for j in self.res[delay:] if not lfilter or (lfilter(i) and lfilter(j))],
- **kargs)
-
- def multiplot(self, f, lfilter=None, **kargs):
- """Uses a function that returns a label and a value for this label, then plots all the values label by label"""
-
- d = defaultdict(list)
- for i in self.res:
- if lfilter and not lfilter(i):
- continue
- k, v = f(i)
- d[k].append(v)
-
- figure = plt.figure()
- ax = figure.add_axes(plt.axes())
- for i in d:
- ax.plot(d[i], **kargs)
- return figure
-
-
- def rawhexdump(self):
- """Prints an hexadecimal dump of each packet in the list"""
- for p in self:
- hexdump(self._elt2pkt(p))
-
- def hexraw(self, lfilter=None):
- """Same as nsummary(), except that if a packet has a Raw layer, it will be hexdumped
- lfilter: a truth function that decides whether a packet must be displayed"""
- for i,p in enumerate(self.res):
- p1 = self._elt2pkt(p)
- if lfilter is not None and not lfilter(p1):
- continue
- print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"),
- p1.sprintf("%.time%"),
- self._elt2sum(p)))
- if p1.haslayer(conf.raw_layer):
- hexdump(p1.getlayer(conf.raw_layer).load)
-
- def hexdump(self, lfilter=None):
- """Same as nsummary(), except that packets are also hexdumped
- lfilter: a truth function that decides whether a packet must be displayed"""
- for i,p in enumerate(self.res):
- p1 = self._elt2pkt(p)
- if lfilter is not None and not lfilter(p1):
- continue
- print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"),
- p1.sprintf("%.time%"),
- self._elt2sum(p)))
- hexdump(p1)
-
- def padding(self, lfilter=None):
- """Same as hexraw(), for Padding layer"""
- for i,p in enumerate(self.res):
- p1 = self._elt2pkt(p)
- if p1.haslayer(conf.padding_layer):
- if lfilter is None or lfilter(p1):
- print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"),
- p1.sprintf("%.time%"),
- self._elt2sum(p)))
- hexdump(p1.getlayer(conf.padding_layer).load)
-
- def nzpadding(self, lfilter=None):
- """Same as padding() but only non null padding"""
- for i,p in enumerate(self.res):
- p1 = self._elt2pkt(p)
- if p1.haslayer(conf.padding_layer):
- pad = p1.getlayer(conf.padding_layer).load
- if pad == pad[0]*len(pad):
- continue
- if lfilter is None or lfilter(p1):
- print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"),
- p1.sprintf("%.time%"),
- self._elt2sum(p)))
- hexdump(p1.getlayer(conf.padding_layer).load)
-
-
- def conversations(self, getsrcdst=None, draw = True, **kargs):
- """Graphes a conversations between sources and destinations and display it
- (using graphviz)
- getsrcdst: a function that takes an element of the list and return the source and dest
- by defaults, return source and destination IP
- if networkx library is available returns a DiGraph, or draws it if draw = True otherwise graphviz is used
- format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option
- target: output filename. If None, matplotlib is used to display
- prog: which graphviz program to use"""
- if getsrcdst is None:
- getsrcdst = lambda x:(x['IP'].src, x['IP'].dst)
- conv = {}
- for p in self.res:
- p = self._elt2pkt(p)
- try:
- c = getsrcdst(p)
- except:
- #XXX warning()
- continue
- conv[c] = conv.get(c,0)+1
-
- if NETWORKX: # networkx is available
- gr = nx.DiGraph()
- for s,d in conv:
- if s not in gr:
- gr.add_node(s)
- if d not in gr:
- gr.add_node(d)
- gr.add_edge(s, d)
- if draw:
- return do_graph(gr, **kargs)
- else:
- return gr
- else:
- gr = 'digraph "conv" {\n'
- for s,d in conv:
- gr += '\t "%s" -> "%s"\n' % (s,d)
- gr += "}\n"
- return do_graph(gr, **kargs)
-
- def afterglow(self, src=None, event=None, dst=None, **kargs):
- """Experimental clone attempt of http://sourceforge.net/projects/afterglow
- each datum is reduced as src -> event -> dst and the data are graphed.
- by default we have IP.src -> IP.dport -> IP.dst"""
- if src is None:
- src = lambda x: x['IP'].src
- if event is None:
- event = lambda x: x['IP'].dport
- if dst is None:
- dst = lambda x: x['IP'].dst
- sl = {}
- el = {}
- dl = {}
- for i in self.res:
- try:
- s,e,d = src(i),event(i),dst(i)
- if s in sl:
- n,l = sl[s]
- n += 1
- if e not in l:
- l.append(e)
- sl[s] = (n,l)
- else:
- sl[s] = (1,[e])
- if e in el:
- n,l = el[e]
- n+=1
- if d not in l:
- l.append(d)
- el[e] = (n,l)
- else:
- el[e] = (1,[d])
- dl[d] = dl.get(d,0)+1
- except:
- continue
-
- import math
- def normalize(n):
- return 2+math.log(n)/4.0
-
- def minmax(x):
- m,M = min(x),max(x)
- if m == M:
- m = 0
- if M == 0:
- M = 1
- return m,M
-
- #mins,maxs = minmax(map(lambda (x,y): x, sl.values()))
- mins,maxs = minmax([ a[0] for a in sl.values()])
- #mine,maxe = minmax(map(lambda (x,y): x, el.values()))
- mine,maxe = minmax([ a[0] for a in el.values()])
- mind,maxd = minmax(dl.values())
-
- gr = 'digraph "afterglow" {\n\tedge [len=2.5];\n'
-
- gr += "# src nodes\n"
- for s in sl:
- n,l = sl[s]; n = 1+(n-mins)/(maxs-mins)
- gr += '"src.%s" [label = "%s", shape=box, fillcolor="#FF0000", style=filled, fixedsize=1, height=%.2f,width=%.2f];\n' % (repr(s),repr(s),n,n)
- gr += "# event nodes\n"
- for e in el:
- n,l = el[e]; n = n = 1+(n-mine)/(maxe-mine)
- gr += '"evt.%s" [label = "%s", shape=circle, fillcolor="#00FFFF", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(e),repr(e),n,n)
- for d in dl:
- n = dl[d]; n = n = 1+(n-mind)/(maxd-mind)
- gr += '"dst.%s" [label = "%s", shape=triangle, fillcolor="#0000ff", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(d),repr(d),n,n)
-
- gr += "###\n"
- for s in sl:
- n,l = sl[s]
- for e in l:
- gr += ' "src.%s" -> "evt.%s";\n' % (repr(s),repr(e))
- for e in el:
- n,l = el[e]
- for d in l:
- gr += ' "evt.%s" -> "dst.%s";\n' % (repr(e),repr(d))
-
- gr += "}"
- return do_graph(gr, **kargs)
-
-
- def _dump_document(self, **kargs):
- import pyx
- d = pyx.document.document()
- l = len(self.res)
- for i in range(len(self.res)):
- elt = self.res[i]
- c = self._elt2pkt(elt).canvas_dump(**kargs)
- cbb = c.bbox()
- c.text(cbb.left(),cbb.top()+1,r"\font\cmssfont=cmss12\cmssfont{Frame %i/%i}" % (i,l),[pyx.text.size.LARGE])
- if conf.verb >= 2:
- os.write(1,b".")
- d.append(pyx.document.page(c, paperformat=pyx.document.paperformat.A4,
- margin=1*pyx.unit.t_cm,
- fittosize=1))
- return d
-
-
-
- def psdump(self, filename = None, **kargs):
- """Creates a multipage poscript file with a psdump of every packet
- filename: name of the file to write to. If empty, a temporary file is used and
- conf.prog.psreader is called"""
- d = self._dump_document(**kargs)
- if filename is None:
- filename = get_temp_file(autoext=".ps")
- d.writePSfile(filename)
- subprocess.Popen([conf.prog.psreader, filename+".ps"])
- else:
- d.writePSfile(filename)
- print
-
- def pdfdump(self, filename = None, **kargs):
- """Creates a PDF file with a psdump of every packet
- filename: name of the file to write to. If empty, a temporary file is used and
- conf.prog.pdfreader is called"""
- d = self._dump_document(**kargs)
- if filename is None:
- filename = get_temp_file(autoext=".pdf")
- d.writePDFfile(filename)
- subprocess.Popen([conf.prog.pdfreader, filename+".pdf"])
- else:
- d.writePDFfile(filename)
- print
-
- def sr(self,multi=0):
- """sr([multi=1]) -> (SndRcvList, PacketList)
- Matches packets in the list and return ( (matched couples), (unmatched packets) )"""
- remain = self.res[:]
- sr = []
- i = 0
- while i < len(remain):
- s = remain[i]
- j = i
- while j < len(remain)-1:
- j += 1
- r = remain[j]
- if r.answers(s):
- sr.append((s,r))
- if multi:
- remain[i]._answered=1
- remain[j]._answered=2
- continue
- del(remain[j])
- del(remain[i])
- i -= 1
- break
- i += 1
- if multi:
- remain = filter(lambda x:not hasattr(x,"_answered"), remain)
- return SndRcvList(sr),PacketList(remain)
-
- def sessions(self, session_extractor=None):
- if session_extractor is None:
- def session_extractor(p):
- sess = "Other"
- if 'Ether' in p:
- if 'IP' in p:
- if 'TCP' in p:
- sess = p.sprintf("TCP %IP.src%:%r,TCP.sport% > %IP.dst%:%r,TCP.dport%")
- elif 'UDP' in p:
- sess = p.sprintf("UDP %IP.src%:%r,UDP.sport% > %IP.dst%:%r,UDP.dport%")
- elif 'ICMP' in p:
- sess = p.sprintf("ICMP %IP.src% > %IP.dst% type=%r,ICMP.type% code=%r,ICMP.code% id=%ICMP.id%")
- else:
- sess = p.sprintf("IP %IP.src% > %IP.dst% proto=%IP.proto%")
- elif 'ARP' in p:
- sess = p.sprintf("ARP %ARP.psrc% > %ARP.pdst%")
- else:
- sess = p.sprintf("Ethernet type=%04xr,Ether.type%")
- return sess
- sessions = defaultdict(self.__class__)
- for p in self.res:
- sess = session_extractor(self._elt2pkt(p))
- sessions[sess].append(p)
- return dict(sessions)
-
- def replace(self, *args, **kargs):
- """
- lst.replace(<field>,[<oldvalue>,]<newvalue>)
- lst.replace( (fld,[ov],nv),(fld,[ov,]nv),...)
- if ov is None, all values are replaced
- ex:
- lst.replace( IP.src, "192.168.1.1", "10.0.0.1" )
- lst.replace( IP.ttl, 64 )
- lst.replace( (IP.ttl, 64), (TCP.sport, 666, 777), )
- """
- delete_checksums = kargs.get("delete_checksums",False)
- x=PacketList(name="Replaced %s" % self.listname)
- if type(args[0]) is not tuple:
- args = (args,)
- for p in self.res:
- p = self._elt2pkt(p)
- copied = False
- for scheme in args:
- fld = scheme[0]
- old = scheme[1] # not used if len(scheme) == 2
- new = scheme[-1]
- for o in fld.owners:
- if o in p:
- if len(scheme) == 2 or p[o].getfieldval(fld.name) == old:
- if not copied:
- p = p.copy()
- if delete_checksums:
- p.delete_checksums()
- copied = True
- setattr(p[o], fld.name, new)
- x.append(p)
- return x
-
-
-class SndRcvList(PacketList):
- def __init__(self, res=None, name="Results", stats=None):
- PacketList.__init__(self, res, name, stats, vector_index = 1)
- def summary(self, prn=None, lfilter=None):
- """prints a summary of each SndRcv packet pair
-prn: function to apply to each packet pair instead of lambda s, r: "%s ==> %s" % (s.summary(),r.summary())
-lfilter: truth function to apply to each packet pair to decide whether it will be displayed"""
- for s, r in self.res:
- if lfilter is not None:
- if not lfilter(s, r):
- continue
- if prn is None:
- print(self._elt2sum((s, r)))
- else:
- print(prn(s, r))
- def nsummary(self,prn=None, lfilter=None):
- """prints a summary of each SndRcv packet pair with the pair's number
-prn: function to apply to each packet pair instead of lambda s, r: "%s ==> %s" % (s.summary(),r.summary())
-lfilter: truth function to apply to each packet pair to decide whether it will be displayed"""
- for i, (s, r) in enumerate(self.res):
- if lfilter is not None:
- if not lfilter(s, r):
- continue
- print(conf.color_theme.id(i,fmt="%04i"), end = " ")
- if prn is None:
- print(self._elt2sum((s, r)))
- else:
- print(prn(s, r))
- def filter(self, func):
- """Returns a SndRcv list filtered by a truth function"""
- return self.__class__( [ i for i in self.res if func(*i) ], name='filtered %s'%self.listname)
-
- def make_table(self, *args, **kargs):
- """Prints a table using a function that returs for each packet its head column value, head row value and displayed value
- ex: p.make_table(lambda s, r:(s[IP].dst, r[TCP].sport, s[TCP].sprintf("%flags%")) """
- return make_table(self.res, *args, **kargs)
- def make_lined_table(self, *args, **kargs):
- """Same as make_table, but print a table with lines"""
- return make_lined_table(self.res, *args, **kargs)
- def make_tex_table(self, *args, **kargs):
- """Same as make_table, but print a table with LaTeX syntax"""
- return make_tex_table(self.res, *args, **kargs)