diff options
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/asn1fields.py')
-rw-r--r-- | scripts/external_libs/scapy-python3-0.18/scapy/asn1fields.py | 341 |
1 files changed, 341 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/asn1fields.py b/scripts/external_libs/scapy-python3-0.18/scapy/asn1fields.py new file mode 100644 index 00000000..e7165a83 --- /dev/null +++ b/scripts/external_libs/scapy-python3-0.18/scapy/asn1fields.py @@ -0,0 +1,341 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +Classes that implement ASN.1 data structures. +""" + +import itertools +from scapy.asn1.asn1 import * +from scapy.asn1.ber import * +from scapy.volatile import * +from scapy.base_classes import BasePacket + + +##################### +#### ASN1 Fields #### +##################### + +class ASN1F_badsequence(Exception): + pass + +class ASN1F_element: + pass + +class ASN1F_optionnal(ASN1F_element): + def __init__(self, field): + self._field=field + def __getattr__(self, attr): + return getattr(self._field,attr) + def dissect(self,pkt,s): + try: + return self._field.dissect(pkt,s) + except ASN1F_badsequence: + self._field.set_val(pkt,None) + return s + except BER_Decoding_Error: + self._field.set_val(pkt,None) + return s + def build(self, pkt): + if self._field.is_empty(pkt): + return b"" + return self._field.build(pkt) + +class ASN1F_field(ASN1F_element): + holds_packets=0 + islist=0 + + ASN1_tag = ASN1_Class_UNIVERSAL.ANY + context=ASN1_Class_UNIVERSAL + + def __init__(self, name, default, context=None): + if context is not None: + self.context = context + self.name = name + self.default = default + + def i2repr(self, pkt, x): + return repr(x) + def i2h(self, pkt, x): + return x + def any2i(self, pkt, x): + return x + def m2i(self, pkt, x): + return self.ASN1_tag.get_codec(pkt.ASN1_codec).safedec(x, context=self.context) + def i2m(self, pkt, x): + if x is None: + x = 0 + if isinstance(x, ASN1_Object): + if ( self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY + or x.tag == ASN1_Class_UNIVERSAL.RAW + or x.tag == ASN1_Class_UNIVERSAL.ERROR + or self.ASN1_tag == x.tag ): + return x.enc(pkt.ASN1_codec) + else: + raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name)) + return self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x) + + def do_copy(self, x): + if hasattr(x, "copy"): + return x.copy() + if type(x) is list: + x = x[:] + for i in range(len(x)): + if isinstance(x[i], BasePacket): + x[i] = x[i].copy() + return x + + def build(self, pkt): + return self.i2m(pkt, getattr(pkt, self.name)) + + def set_val(self, pkt, val): + setattr(pkt, self.name, val) + def is_empty(self, pkt): + return getattr(pkt,self.name) is None + + def dissect(self, pkt, s): + v,s = self.m2i(pkt, s) + self.set_val(pkt, v) + return s + + def get_fields_list(self): + return [self] + + def __hash__(self): + return hash(self.name) + def __str__(self): + return self.name + def __eq__(self, other): + return self.name == other + def __repr__(self): + return self.name + def randval(self): + return RandInt() + + +class ASN1F_INTEGER(ASN1F_field): + ASN1_tag= ASN1_Class_UNIVERSAL.INTEGER + def randval(self): + return RandNum(-2**64, 2**64-1) + +class ASN1F_BOOLEAN(ASN1F_field): + ASN1_tag= ASN1_Class_UNIVERSAL.BOOLEAN + def randval(self): + return RandChoice(True,False) + +class ASN1F_NULL(ASN1F_INTEGER): + ASN1_tag= ASN1_Class_UNIVERSAL.NULL + +class ASN1F_SEP(ASN1F_NULL): + ASN1_tag= ASN1_Class_UNIVERSAL.SEP + +class ASN1F_enum_INTEGER(ASN1F_INTEGER): + def __init__(self, name, default, enum): + ASN1F_INTEGER.__init__(self, name, default) + i2s = self.i2s = {} + s2i = self.s2i = {} + if type(enum) is list: + keys = range(len(enum)) + else: + keys = enum.keys() + #if filter(lambda x: type(x) is str, keys): + if list(filter(lambda x: type(x) is str, keys)): + i2s,s2i = s2i,i2s + for k in keys: + i2s[k] = enum[k] + s2i[enum[k]] = k + def any2i_one(self, pkt, x): + if type(x) is str: + x = self.s2i[x] + return x + def i2repr_one(self, pkt, x): + return self.i2s.get(x, repr(x)) + + def any2i(self, pkt, x): + if type(x) is list: + return map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x) + else: + return self.any2i_one(pkt,x) + def i2repr(self, pkt, x): + if type(x) is list: + return map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x) + else: + return self.i2repr_one(pkt,x) + +class ASN1F_ENUMERATED(ASN1F_enum_INTEGER): + ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED + +class ASN1F_STRING(ASN1F_field): + ASN1_tag = ASN1_Class_UNIVERSAL.STRING + def randval(self): + return RandString(RandNum(0, 1000)) + +class ASN1F_PRINTABLE_STRING(ASN1F_STRING): + ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING + +class ASN1F_BIT_STRING(ASN1F_STRING): + ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING + +class ASN1F_IPADDRESS(ASN1F_STRING): + ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS + +class ASN1F_TIME_TICKS(ASN1F_INTEGER): + ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS + +class ASN1F_UTC_TIME(ASN1F_STRING): + ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME + +class ASN1F_GENERALIZED_TIME(ASN1F_STRING): + ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME + +class ASN1F_OID(ASN1F_field): + ASN1_tag = ASN1_Class_UNIVERSAL.OID + def randval(self): + return RandOID() + +class ASN1F_SEQUENCE(ASN1F_field): + ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE + def __init__(self, *seq, **kargs): + if "ASN1_tag" in kargs: + self.ASN1_tag = kargs["ASN1_tag"] + self.seq = seq + def __repr__(self): + return "<%s%r>" % (self.__class__.__name__,self.seq,) + def set_val(self, pkt, val): + for f in self.seq: + f.set_val(pkt,val) + def is_empty(self, pkt): + for f in self.seq: + if not f.is_empty(pkt): + return False + return True + def get_fields_list(self): + #return reduce(lambda x,y: x+y.get_fields_list(), self.seq, []) + return list(itertools.chain(*[ i.get_fields_list() for i in self.seq ])) + def build(self, pkt): + #s = reduce(lambda x,y: x+y.build(pkt), self.seq, b"") + s = b"" + for i in self.seq: + s += i.build(pkt) + return self.i2m(pkt, s) + def dissect(self, pkt, s): + codec = self.ASN1_tag.get_codec(pkt.ASN1_codec) + try: + i,s,remain = codec.check_type_check_len(s) + for obj in self.seq: + s = obj.dissect(pkt,s) + if s: + warning("Too many bytes to decode sequence: [%r]" % s) # XXX not reversible! + return remain + except ASN1_Error as e: + raise ASN1F_badsequence(e) + +class ASN1F_SET(ASN1F_SEQUENCE): + ASN1_tag = ASN1_Class_UNIVERSAL.SET + +class ASN1F_SEQUENCE_OF(ASN1F_SEQUENCE): + holds_packets = 1 + islist = 1 + def __init__(self, name, default, asn1pkt, ASN1_tag=0x30): + self.asn1pkt = asn1pkt + self.tag = bytes([ASN1_tag]) + self.name = name + self.default = default + def i2repr(self, pkt, i): + if i is None: + return [] + return i + def get_fields_list(self): + return [self] + def set_val(self, pkt, val): + ASN1F_field.set_val(self, pkt, val) + def is_empty(self, pkt): + return ASN1F_field.is_empty(self, pkt) + def build(self, pkt): + val = getattr(pkt, self.name) + if isinstance(val, ASN1_Object) and val.tag == ASN1_Class_UNIVERSAL.RAW: + s = val + elif val is None: + s = b"" + else: + #print(val) + #s = b"".join(map(str, val )) + s = b"".join([ bytes(i) for i in val ]) + return self.i2m(pkt, s) + def dissect(self, pkt, s): + codec = self.ASN1_tag.get_codec(pkt.ASN1_codec) + i,s1,remain = codec.check_type_check_len(s) + lst = [] + while s1: + try: + p = self.asn1pkt(s1) + except ASN1F_badsequence as e: + lst.append(conf.raw_layer(s1)) + break + lst.append(p) + if conf.raw_layer in p: + s1 = p[conf.raw_layer].load + del(p[conf.raw_layer].underlayer.payload) + else: + break + self.set_val(pkt, lst) + return remain + def randval(self): + return fuzz(self.asn1pkt()) + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__,self.name) + +class ASN1F_PACKET(ASN1F_field): + holds_packets = 1 + def __init__(self, name, default, cls): + ASN1F_field.__init__(self, name, default) + self.cls = cls + def i2m(self, pkt, x): + if x is None: + x = b"" + return bytes(x) + def extract_packet(self, cls, x): + try: + c = cls(x) + except ASN1F_badsequence: + c = conf.raw_layer(x) + cpad = c.getlayer(conf.padding_layer) + x = b"" + if cpad is not None: + x = cpad.load + del(cpad.underlayer.payload) + return c,x + def m2i(self, pkt, x): + return self.extract_packet(self.cls, x) + + +class ASN1F_CHOICE(ASN1F_PACKET): + ASN1_tag = ASN1_Class_UNIVERSAL.NONE + def __init__(self, name, default, *args): + self.name=name + self.choice = {} + for p in args: + self.choice[p.ASN1_root.ASN1_tag] = p +# self.context=context + self.default=default + def m2i(self, pkt, x): + if len(x) == 0: + return conf.raw_layer(),b"" + raise ASN1_Error("ASN1F_CHOICE: got empty string") + #if ord(x[0]) not in self.choice: + if (x[0]) not in self.choice: + return conf.raw_layer(x),b"" # XXX return RawASN1 packet ? Raise error + #raise ASN1_Error("Decoding Error: choice [%i] not found in %r" % (ord(x[0]), self.choice.keys())) + raise ASN1_Error("Decoding Error: choice [%i] not found in %r" % ((x[0]), self.choice.keys())) + + #z = ASN1F_PACKET.extract_packet(self, self.choice[ord(x[0])], x) + z = ASN1F_PACKET.extract_packet(self, self.choice[(x[0])], x) + return z + def randval(self): + return RandChoice(*map(lambda x:fuzz(x()), self.choice.values())) + + +# This import must come in last to avoid problems with cyclic dependencies +import scapy.packet |