summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1fields.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/scapy-2.3.1/python3/scapy/asn1fields.py')
-rw-r--r--scripts/external_libs/scapy-2.3.1/python3/scapy/asn1fields.py341
1 files changed, 341 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1fields.py b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1fields.py
new file mode 100644
index 00000000..e7165a83
--- /dev/null
+++ b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1fields.py
@@ -0,0 +1,341 @@
+## This file is part of Scapy
+## See http://www.secdev.org/projects/scapy for more informations
+## Copyright (C) Philippe Biondi <phil@secdev.org>
+## This program is published under a GPLv2 license
+
+"""
+Classes that implement ASN.1 data structures.
+"""
+
+import itertools
+from scapy.asn1.asn1 import *
+from scapy.asn1.ber import *
+from scapy.volatile import *
+from scapy.base_classes import BasePacket
+
+
+#####################
+#### ASN1 Fields ####
+#####################
+
+class ASN1F_badsequence(Exception):
+ pass
+
+class ASN1F_element:
+ pass
+
+class ASN1F_optionnal(ASN1F_element):
+ def __init__(self, field):
+ self._field=field
+ def __getattr__(self, attr):
+ return getattr(self._field,attr)
+ def dissect(self,pkt,s):
+ try:
+ return self._field.dissect(pkt,s)
+ except ASN1F_badsequence:
+ self._field.set_val(pkt,None)
+ return s
+ except BER_Decoding_Error:
+ self._field.set_val(pkt,None)
+ return s
+ def build(self, pkt):
+ if self._field.is_empty(pkt):
+ return b""
+ return self._field.build(pkt)
+
+class ASN1F_field(ASN1F_element):
+ holds_packets=0
+ islist=0
+
+ ASN1_tag = ASN1_Class_UNIVERSAL.ANY
+ context=ASN1_Class_UNIVERSAL
+
+ def __init__(self, name, default, context=None):
+ if context is not None:
+ self.context = context
+ self.name = name
+ self.default = default
+
+ def i2repr(self, pkt, x):
+ return repr(x)
+ def i2h(self, pkt, x):
+ return x
+ def any2i(self, pkt, x):
+ return x
+ def m2i(self, pkt, x):
+ return self.ASN1_tag.get_codec(pkt.ASN1_codec).safedec(x, context=self.context)
+ def i2m(self, pkt, x):
+ if x is None:
+ x = 0
+ if isinstance(x, ASN1_Object):
+ if ( self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY
+ or x.tag == ASN1_Class_UNIVERSAL.RAW
+ or x.tag == ASN1_Class_UNIVERSAL.ERROR
+ or self.ASN1_tag == x.tag ):
+ return x.enc(pkt.ASN1_codec)
+ else:
+ raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name))
+ return self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x)
+
+ def do_copy(self, x):
+ if hasattr(x, "copy"):
+ return x.copy()
+ if type(x) is list:
+ x = x[:]
+ for i in range(len(x)):
+ if isinstance(x[i], BasePacket):
+ x[i] = x[i].copy()
+ return x
+
+ def build(self, pkt):
+ return self.i2m(pkt, getattr(pkt, self.name))
+
+ def set_val(self, pkt, val):
+ setattr(pkt, self.name, val)
+ def is_empty(self, pkt):
+ return getattr(pkt,self.name) is None
+
+ def dissect(self, pkt, s):
+ v,s = self.m2i(pkt, s)
+ self.set_val(pkt, v)
+ return s
+
+ def get_fields_list(self):
+ return [self]
+
+ def __hash__(self):
+ return hash(self.name)
+ def __str__(self):
+ return self.name
+ def __eq__(self, other):
+ return self.name == other
+ def __repr__(self):
+ return self.name
+ def randval(self):
+ return RandInt()
+
+
+class ASN1F_INTEGER(ASN1F_field):
+ ASN1_tag= ASN1_Class_UNIVERSAL.INTEGER
+ def randval(self):
+ return RandNum(-2**64, 2**64-1)
+
+class ASN1F_BOOLEAN(ASN1F_field):
+ ASN1_tag= ASN1_Class_UNIVERSAL.BOOLEAN
+ def randval(self):
+ return RandChoice(True,False)
+
+class ASN1F_NULL(ASN1F_INTEGER):
+ ASN1_tag= ASN1_Class_UNIVERSAL.NULL
+
+class ASN1F_SEP(ASN1F_NULL):
+ ASN1_tag= ASN1_Class_UNIVERSAL.SEP
+
+class ASN1F_enum_INTEGER(ASN1F_INTEGER):
+ def __init__(self, name, default, enum):
+ ASN1F_INTEGER.__init__(self, name, default)
+ i2s = self.i2s = {}
+ s2i = self.s2i = {}
+ if type(enum) is list:
+ keys = range(len(enum))
+ else:
+ keys = enum.keys()
+ #if filter(lambda x: type(x) is str, keys):
+ if list(filter(lambda x: type(x) is str, keys)):
+ i2s,s2i = s2i,i2s
+ for k in keys:
+ i2s[k] = enum[k]
+ s2i[enum[k]] = k
+ def any2i_one(self, pkt, x):
+ if type(x) is str:
+ x = self.s2i[x]
+ return x
+ def i2repr_one(self, pkt, x):
+ return self.i2s.get(x, repr(x))
+
+ def any2i(self, pkt, x):
+ if type(x) is list:
+ return map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x)
+ else:
+ return self.any2i_one(pkt,x)
+ def i2repr(self, pkt, x):
+ if type(x) is list:
+ return map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x)
+ else:
+ return self.i2repr_one(pkt,x)
+
+class ASN1F_ENUMERATED(ASN1F_enum_INTEGER):
+ ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED
+
+class ASN1F_STRING(ASN1F_field):
+ ASN1_tag = ASN1_Class_UNIVERSAL.STRING
+ def randval(self):
+ return RandString(RandNum(0, 1000))
+
+class ASN1F_PRINTABLE_STRING(ASN1F_STRING):
+ ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
+
+class ASN1F_BIT_STRING(ASN1F_STRING):
+ ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
+
+class ASN1F_IPADDRESS(ASN1F_STRING):
+ ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS
+
+class ASN1F_TIME_TICKS(ASN1F_INTEGER):
+ ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS
+
+class ASN1F_UTC_TIME(ASN1F_STRING):
+ ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME
+
+class ASN1F_GENERALIZED_TIME(ASN1F_STRING):
+ ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
+
+class ASN1F_OID(ASN1F_field):
+ ASN1_tag = ASN1_Class_UNIVERSAL.OID
+ def randval(self):
+ return RandOID()
+
+class ASN1F_SEQUENCE(ASN1F_field):
+ ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
+ def __init__(self, *seq, **kargs):
+ if "ASN1_tag" in kargs:
+ self.ASN1_tag = kargs["ASN1_tag"]
+ self.seq = seq
+ def __repr__(self):
+ return "<%s%r>" % (self.__class__.__name__,self.seq,)
+ def set_val(self, pkt, val):
+ for f in self.seq:
+ f.set_val(pkt,val)
+ def is_empty(self, pkt):
+ for f in self.seq:
+ if not f.is_empty(pkt):
+ return False
+ return True
+ def get_fields_list(self):
+ #return reduce(lambda x,y: x+y.get_fields_list(), self.seq, [])
+ return list(itertools.chain(*[ i.get_fields_list() for i in self.seq ]))
+ def build(self, pkt):
+ #s = reduce(lambda x,y: x+y.build(pkt), self.seq, b"")
+ s = b""
+ for i in self.seq:
+ s += i.build(pkt)
+ return self.i2m(pkt, s)
+ def dissect(self, pkt, s):
+ codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
+ try:
+ i,s,remain = codec.check_type_check_len(s)
+ for obj in self.seq:
+ s = obj.dissect(pkt,s)
+ if s:
+ warning("Too many bytes to decode sequence: [%r]" % s) # XXX not reversible!
+ return remain
+ except ASN1_Error as e:
+ raise ASN1F_badsequence(e)
+
+class ASN1F_SET(ASN1F_SEQUENCE):
+ ASN1_tag = ASN1_Class_UNIVERSAL.SET
+
+class ASN1F_SEQUENCE_OF(ASN1F_SEQUENCE):
+ holds_packets = 1
+ islist = 1
+ def __init__(self, name, default, asn1pkt, ASN1_tag=0x30):
+ self.asn1pkt = asn1pkt
+ self.tag = bytes([ASN1_tag])
+ self.name = name
+ self.default = default
+ def i2repr(self, pkt, i):
+ if i is None:
+ return []
+ return i
+ def get_fields_list(self):
+ return [self]
+ def set_val(self, pkt, val):
+ ASN1F_field.set_val(self, pkt, val)
+ def is_empty(self, pkt):
+ return ASN1F_field.is_empty(self, pkt)
+ def build(self, pkt):
+ val = getattr(pkt, self.name)
+ if isinstance(val, ASN1_Object) and val.tag == ASN1_Class_UNIVERSAL.RAW:
+ s = val
+ elif val is None:
+ s = b""
+ else:
+ #print(val)
+ #s = b"".join(map(str, val ))
+ s = b"".join([ bytes(i) for i in val ])
+ return self.i2m(pkt, s)
+ def dissect(self, pkt, s):
+ codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
+ i,s1,remain = codec.check_type_check_len(s)
+ lst = []
+ while s1:
+ try:
+ p = self.asn1pkt(s1)
+ except ASN1F_badsequence as e:
+ lst.append(conf.raw_layer(s1))
+ break
+ lst.append(p)
+ if conf.raw_layer in p:
+ s1 = p[conf.raw_layer].load
+ del(p[conf.raw_layer].underlayer.payload)
+ else:
+ break
+ self.set_val(pkt, lst)
+ return remain
+ def randval(self):
+ return fuzz(self.asn1pkt())
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__,self.name)
+
+class ASN1F_PACKET(ASN1F_field):
+ holds_packets = 1
+ def __init__(self, name, default, cls):
+ ASN1F_field.__init__(self, name, default)
+ self.cls = cls
+ def i2m(self, pkt, x):
+ if x is None:
+ x = b""
+ return bytes(x)
+ def extract_packet(self, cls, x):
+ try:
+ c = cls(x)
+ except ASN1F_badsequence:
+ c = conf.raw_layer(x)
+ cpad = c.getlayer(conf.padding_layer)
+ x = b""
+ if cpad is not None:
+ x = cpad.load
+ del(cpad.underlayer.payload)
+ return c,x
+ def m2i(self, pkt, x):
+ return self.extract_packet(self.cls, x)
+
+
+class ASN1F_CHOICE(ASN1F_PACKET):
+ ASN1_tag = ASN1_Class_UNIVERSAL.NONE
+ def __init__(self, name, default, *args):
+ self.name=name
+ self.choice = {}
+ for p in args:
+ self.choice[p.ASN1_root.ASN1_tag] = p
+# self.context=context
+ self.default=default
+ def m2i(self, pkt, x):
+ if len(x) == 0:
+ return conf.raw_layer(),b""
+ raise ASN1_Error("ASN1F_CHOICE: got empty string")
+ #if ord(x[0]) not in self.choice:
+ if (x[0]) not in self.choice:
+ return conf.raw_layer(x),b"" # XXX return RawASN1 packet ? Raise error
+ #raise ASN1_Error("Decoding Error: choice [%i] not found in %r" % (ord(x[0]), self.choice.keys()))
+ raise ASN1_Error("Decoding Error: choice [%i] not found in %r" % ((x[0]), self.choice.keys()))
+
+ #z = ASN1F_PACKET.extract_packet(self, self.choice[ord(x[0])], x)
+ z = ASN1F_PACKET.extract_packet(self, self.choice[(x[0])], x)
+ return z
+ def randval(self):
+ return RandChoice(*map(lambda x:fuzz(x()), self.choice.values()))
+
+
+# This import must come in last to avoid problems with cyclic dependencies
+import scapy.packet