diff options
Diffstat (limited to 'scripts/external_libs/scapy-2.3.1/python3/scapy/asn1')
4 files changed, 852 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/__init__.py b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/__init__.py new file mode 100644 index 00000000..4827a588 --- /dev/null +++ b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/__init__.py @@ -0,0 +1,12 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +Package holding ASN.1 related modules. +""" + +# We do not import mib.py because it is more bound to scapy and +# less prone to be used in a standalone fashion +__all__ = ["asn1","ber"] diff --git a/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/asn1.py b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/asn1.py new file mode 100644 index 00000000..c94d0b12 --- /dev/null +++ b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/asn1.py @@ -0,0 +1,321 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +ASN.1 (Abstract Syntax Notation One) +""" + +import random +from scapy.config import conf +from scapy.error import Scapy_Exception,warning +from scapy.volatile import RandField +from scapy.utils import Enum_metaclass, EnumElement + +class RandASN1Object(RandField): + def __init__(self, objlist=None): + if objlist is None: + objlist = [ x._asn1_obj for x in + [ x for x in ASN1_Class_UNIVERSAL.__rdict__.values() if hasattr(x,"_asn1_obj") ]] +# objlist = map(lambda x:x._asn1_obj, +# [ x for x in ASN1_Class_UNIVERSAL.__rdict__.values() if hasattr(x,"_asn1_obj") ]) + self.objlist = objlist + self.chars = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + def _fix(self, n=0): + o = random.choice(self.objlist) + if issubclass(o, ASN1_INTEGER): + return o(int(random.gauss(0,1000))) + elif issubclass(o, ASN1_IPADDRESS): + z = RandIP()._fix() + return o(z) + elif issubclass(o, ASN1_STRING): + z = int(random.expovariate(0.05)+1) + return o(bytes([random.choice(self.chars) for i in range(z)])) + elif issubclass(o, ASN1_SEQUENCE) and (n < 10): + z = int(random.expovariate(0.08)+1) +# return o(map(lambda x:x._fix(n+1), [self.__class__(objlist=self.objlist)]*z)) + return o([ x._fix(n+1) for x in [self.__class__(objlist=self.objlist)]*z]) + return ASN1_INTEGER(int(random.gauss(0,1000))) + + +############## +#### ASN1 #### +############## + +class ASN1_Error(Scapy_Exception): + pass + +class ASN1_Encoding_Error(ASN1_Error): + pass + +class ASN1_Decoding_Error(ASN1_Error): + pass + +class ASN1_BadTag_Decoding_Error(ASN1_Decoding_Error): + pass + + + +class ASN1Codec(EnumElement): + def register_stem(cls, stem): + cls._stem = stem + def dec(cls, s, context=None): + return cls._stem.dec(s, context=context) + def safedec(cls, s, context=None): + return cls._stem.safedec(s, context=context) + def get_stem(cls): + return cls.stem + + +class ASN1_Codecs_metaclass(Enum_metaclass): + element_class = ASN1Codec + +class ASN1_Codecs(metaclass = ASN1_Codecs_metaclass): + #__metaclass__ = ASN1_Codecs_metaclass + BER = 1 + DER = 2 + PER = 3 + CER = 4 + LWER = 5 + BACnet = 6 + OER = 7 + SER = 8 + XER = 9 + +class ASN1Tag(EnumElement): + def __init__(self, key, value, context=None, codec=None): + EnumElement.__init__(self, key, value) + self._context = context + if codec == None: + codec = {} + self._codec = codec + def clone(self): # /!\ not a real deep copy. self.codec is shared + return self.__class__(self._key, self._value, self._context, self._codec) + def register_asn1_object(self, asn1obj): + self._asn1_obj = asn1obj + def asn1_object(self, val): + if hasattr(self,"_asn1_obj"): + return self._asn1_obj(val) + raise ASN1_Error("%r does not have any assigned ASN1 object" % self) + def register(self, codecnum, codec): + self._codec[codecnum] = codec + def get_codec(self, codec): + try: + c = self._codec[codec] + except KeyError as msg: + raise ASN1_Error("Codec %r not found for tag %r" % (codec, self)) + return c + +class ASN1_Class_metaclass(Enum_metaclass): + element_class = ASN1Tag + def __new__(cls, name, bases, dct): # XXX factorise a bit with Enum_metaclass.__new__() + for b in bases: + for k,v in b.__dict__.items(): + if k not in dct and isinstance(v,ASN1Tag): + dct[k] = v.clone() + + rdict = {} + for k,v in dct.items(): + if type(v) is int: + v = ASN1Tag(k,v) + dct[k] = v + rdict[v] = v + elif isinstance(v, ASN1Tag): + rdict[v] = v + dct["__rdict__"] = rdict + + cls = type.__new__(cls, name, bases, dct) + for v in cls.__dict__.values(): + if isinstance(v, ASN1Tag): + v.context = cls # overwrite ASN1Tag contexts, even cloned ones + return cls + + +class ASN1_Class(metaclass = ASN1_Class_metaclass): + pass + +class ASN1_Class_UNIVERSAL(ASN1_Class): + name = "UNIVERSAL" + ERROR = -3 + RAW = -2 + NONE = -1 + ANY = 0 + BOOLEAN = 1 + INTEGER = 2 + BIT_STRING = 3 + STRING = 4 + NULL = 5 + OID = 6 + OBJECT_DESCRIPTOR = 7 + EXTERNAL = 8 + REAL = 9 + ENUMERATED = 10 + EMBEDDED_PDF = 11 + UTF8_STRING = 12 + RELATIVE_OID = 13 + SEQUENCE = 0x30#XXX 16 ?? + SET = 0x31 #XXX 17 ?? + NUMERIC_STRING = 18 + PRINTABLE_STRING = 19 + T61_STRING = 20 + VIDEOTEX_STRING = 21 + IA5_STRING = 22 + UTC_TIME = 23 + GENERALIZED_TIME = 24 + GRAPHIC_STRING = 25 + ISO646_STRING = 26 + GENERAL_STRING = 27 + UNIVERSAL_STRING = 28 + CHAR_STRING = 29 + BMP_STRING = 30 + IPADDRESS = 0x40 + COUNTER32 = 0x41 + GAUGE32 = 0x42 + TIME_TICKS = 0x43 + SEP = 0x80 + +class ASN1_Object_metaclass(type): + def __new__(cls, name, bases, dct): + c = super(ASN1_Object_metaclass, cls).__new__(cls, name, bases, dct) + try: + c.tag.register_asn1_object(c) + except: + warning("Error registering %r for %r" % (c.tag, c.codec)) + return c + + +class ASN1_Object(metaclass = ASN1_Object_metaclass): + tag = ASN1_Class_UNIVERSAL.ANY + def __init__(self, val): + self.val = val + def enc(self, codec): + return self.tag.get_codec(codec).enc(self.val) + def __repr__(self): + return "<%s[%r]>" % (self.__dict__.get("name", self.__class__.__name__), self.val) + def __str__(self): + raise Exception("Should not get here") + #return self.enc(conf.ASN1_default_codec) + def __bytes__(self): + return self.enc(conf.ASN1_default_codec) + def strshow(self, lvl=0): + return (" "*lvl)+repr(self)+"\n" + def show(self, lvl=0): + print(self.strshow(lvl)) + def __eq__(self, other): + return self.val == other + def __hash__(self): + return self.val + def __cmp__(self, other): + return cmp(self.val, other) + +class ASN1_DECODING_ERROR(ASN1_Object): + tag = ASN1_Class_UNIVERSAL.ERROR + def __init__(self, val, exc=None): + ASN1_Object.__init__(self, val) + self.exc = exc + def __repr__(self): + return "<%s[%r]{{%s}}>" % (self.__dict__.get("name", self.__class__.__name__), + self.val, self.exc.args[0]) + def enc(self, codec): + if isinstance(self.val, ASN1_Object): + return self.val.enc(codec) + return self.val + +class ASN1_force(ASN1_Object): + tag = ASN1_Class_UNIVERSAL.RAW + def enc(self, codec): + if isinstance(self.val, ASN1_Object): + return self.val.enc(codec) + return self.val + +class ASN1_BADTAG(ASN1_force): + pass + +class ASN1_INTEGER(ASN1_Object): + tag = ASN1_Class_UNIVERSAL.INTEGER + +class ASN1_STRING(ASN1_Object): + tag = ASN1_Class_UNIVERSAL.STRING + def __init__(self, val): + if type(val) is str: + self.val = val.encode('ascii') + elif type(val) is bytes: + self.val = val + else: + raise Exception("Unknown value type for ASN1_STRING") + +class ASN1_BIT_STRING(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.BIT_STRING + +class ASN1_PRINTABLE_STRING(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING + +class ASN1_T61_STRING(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.T61_STRING + +class ASN1_IA5_STRING(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.IA5_STRING + +class ASN1_NUMERIC_STRING(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING + +class ASN1_VIDEOTEX_STRING(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING + +class ASN1_IPADDRESS(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.IPADDRESS + +class ASN1_UTC_TIME(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.UTC_TIME + +class ASN1_GENERALIZED_TIME(ASN1_STRING): + tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME + +class ASN1_TIME_TICKS(ASN1_INTEGER): + tag = ASN1_Class_UNIVERSAL.TIME_TICKS + +class ASN1_BOOLEAN(ASN1_INTEGER): + tag = ASN1_Class_UNIVERSAL.BOOLEAN + +class ASN1_ENUMERATED(ASN1_INTEGER): + tag = ASN1_Class_UNIVERSAL.ENUMERATED + +class ASN1_NULL(ASN1_INTEGER): + tag = ASN1_Class_UNIVERSAL.NULL + +class ASN1_SEP(ASN1_NULL): + tag = ASN1_Class_UNIVERSAL.SEP + +class ASN1_GAUGE32(ASN1_INTEGER): + tag = ASN1_Class_UNIVERSAL.GAUGE32 + +class ASN1_COUNTER32(ASN1_INTEGER): + tag = ASN1_Class_UNIVERSAL.COUNTER32 + +class ASN1_SEQUENCE(ASN1_Object): + tag = ASN1_Class_UNIVERSAL.SEQUENCE + def strshow(self, lvl=0): + s = (" "*lvl)+("# %s:" % self.__class__.__name__)+"\n" + for o in self.val: + s += o.strshow(lvl=lvl+1) + return s + +class ASN1_SET(ASN1_SEQUENCE): + tag = ASN1_Class_UNIVERSAL.SET + +class ASN1_OID(ASN1_Object): + tag = ASN1_Class_UNIVERSAL.OID + def __init__(self, val): + if type(val) is str: + val = val.encode('ascii') + val = conf.mib._oid(val) + ASN1_Object.__init__(self, val) + def __repr__(self): + return "<%s[%r]>" % (self.__dict__.get("name", self.__class__.__name__), conf.mib._oidname(self.val)) + def __oidname__(self): + return '%s'%conf.mib._oidname(self.val) + + + +conf.ASN1_default_codec = ASN1_Codecs.BER diff --git a/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/ber.py b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/ber.py new file mode 100644 index 00000000..48cb1c2d --- /dev/null +++ b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/ber.py @@ -0,0 +1,370 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +Basic Encoding Rules (BER) for ASN.1 +""" + +from scapy.error import warning +from scapy.utils import inet_aton,inet_ntoa +from scapy.asn1.asn1 import ASN1_Decoding_Error,ASN1_Encoding_Error,ASN1_BadTag_Decoding_Error,ASN1_Codecs,ASN1_Class_UNIVERSAL,ASN1_Error,ASN1_DECODING_ERROR,ASN1_BADTAG + +################## +## BER encoding ## +################## + + + +#####[ BER tools ]##### + + +class BER_Exception(Exception): + pass + +class BER_Encoding_Error(ASN1_Encoding_Error): + def __init__(self, msg, encoded=None, remaining=None): + Exception.__init__(self, msg) + self.remaining = remaining + self.encoded = encoded + def __str__(self): + s = Exception.__str__(self) + if isinstance(self.encoded, BERcodec_Object): + s+="\n### Already encoded ###\n%s" % self.encoded.strshow() + else: + s+="\n### Already encoded ###\n%r" % self.encoded + s+="\n### Remaining ###\n%r" % self.remaining + return s + +class BER_Decoding_Error(ASN1_Decoding_Error): + def __init__(self, msg, decoded=None, remaining=None): + Exception.__init__(self, msg) + self.remaining = remaining + self.decoded = decoded + def __str__(self): + s = Exception.__str__(self) + if isinstance(self.decoded, BERcodec_Object): + s+="\n### Already decoded ###\n%s" % self.decoded.strshow() + else: + s+="\n### Already decoded ###\n%r" % self.decoded + s+="\n### Remaining ###\n%r" % self.remaining + return s + +class BER_BadTag_Decoding_Error(BER_Decoding_Error, ASN1_BadTag_Decoding_Error): + pass + +def BER_len_enc(l, size=0): + if l <= 127 and size==0: + return bytes([l]) + s = b"" + while l or size>0: + s = bytes([l&0xff])+s + l >>= 8 + size -= 1 + if len(s) > 127: + raise BER_Exception("BER_len_enc: Length too long (%i) to be encoded [%r]" % (len(s),s)) + return bytes([len(s)|0x80])+s +def BER_len_dec(s): + l = (s[0]) + if not l & 0x80: + return l,s[1:] + l &= 0x7f + if len(s) <= l: + raise BER_Decoding_Error("BER_len_dec: Got %i bytes while expecting %i" % (len(s)-1, l),remaining=s) + ll = 0 + for c in s[1:l+1]: + ll <<= 8 + ll |= (c) + return ll,s[l+1:] + +def BER_num_enc(l, size=1): + x=[] + while l or size>0: + x.insert(0, l & 0x7f) + if len(x) > 1: + x[0] |= 0x80 + l >>= 7 + size -= 1 + return bytes([(k) for k in x]) +def BER_num_dec(s): + x = 0 + for i in range(len(s)): + c = (s[i]) + x <<= 7 + x |= c&0x7f + if not c&0x80: + break + if c&0x80: + raise BER_Decoding_Error("BER_num_dec: unfinished number description", remaining=s) + return x, s[i+1:] + +#####[ BER classes ]##### + +class BERcodec_metaclass(type): + def __new__(cls, name, bases, dct): + c = super(BERcodec_metaclass, cls).__new__(cls, name, bases, dct) + try: + c.tag.register(c.codec, c) + except: + warning("Error registering %r for %r" % (c.tag, c.codec)) + return c + + +class BERcodec_Object( metaclass = BERcodec_metaclass): + codec = ASN1_Codecs.BER + tag = ASN1_Class_UNIVERSAL.ANY + + @classmethod + def asn1_object(cls, val): + return cls.tag.asn1_object(val) + + @classmethod + def check_string(cls, s): + if not s: + raise BER_Decoding_Error("%s: Got empty object while expecting tag %r" % + (cls.__name__,cls.tag), remaining=s) + @classmethod + def check_type(cls, s): + cls.check_string(s) + if hash(cls.tag) != (s[0]): + raise BER_BadTag_Decoding_Error("%s: Got tag [%i/%#x] while expecting %r" % + (cls.__name__, (s[0]), (s[0]),cls.tag), remaining=s) + return s[1:] + @classmethod + def check_type_get_len(cls, s): + s2 = cls.check_type(s) + if not s2: + raise BER_Decoding_Error("%s: No bytes while expecting a length" % + cls.__name__, remaining=s) + return BER_len_dec(s2) + @classmethod + def check_type_check_len(cls, s): + l,s3 = cls.check_type_get_len(s) + if len(s3) < l: + raise BER_Decoding_Error("%s: Got %i bytes while expecting %i" % + (cls.__name__, len(s3), l), remaining=s) + return l,s3[:l],s3[l:] + + @classmethod + def do_dec(cls, s, context=None, safe=False): + if context is None: + context = cls.tag.context + cls.check_string(s) + p = (s[0]) + if p not in context: + t = s + if len(t) > 18: + t = t[:15]+"..." + raise BER_Decoding_Error("Unknown prefix [%02x] for [%r]" % (p,t), remaining=s) + codec = context[p].get_codec(ASN1_Codecs.BER) + return codec.dec(s,context,safe) + + @classmethod + def dec(cls, s, context=None, safe=False): + if not safe: + return cls.do_dec(s, context, safe) + try: + return cls.do_dec(s, context, safe) + except BER_BadTag_Decoding_Error as e: + o,remain = BERcodec_Object.dec(e.remaining, context, safe) + return ASN1_BADTAG(o),remain + except BER_Decoding_Error as e: + return ASN1_DECODING_ERROR(s, exc=e),"" + except ASN1_Error as e: + return ASN1_DECODING_ERROR(s, exc=e),"" + + @classmethod + def safedec(cls, s, context=None): + return cls.dec(s, context, safe=True) + + + @classmethod + def enc(cls, s): + if type(s) is str: + return BERcodec_STRING.enc(s) + #TODO3 wild guess + elif type(s) is bytes: + return BERcodec_STRING.enc(s) + else: + return BERcodec_INTEGER.enc(hash(s)) + + + +ASN1_Codecs.BER.register_stem(BERcodec_Object) + + +class BERcodec_INTEGER(BERcodec_Object): + tag = ASN1_Class_UNIVERSAL.INTEGER + @classmethod + def enc(cls, i): + s = [] + while 1: + s.append(i&0xff) + if -127 <= i < 0: + break + if 128 <= i <= 255: + s.append(0) + i >>= 8 + if not i: + break + #s = map(chr, s) + s = bytes(s) + BER_len_enc(len(s)) + bytes([hash(cls.tag)]) + #s.reverse() + #return b"".join(s) + return s[::-1] + @classmethod + def do_dec(cls, s, context=None, safe=False): + l,s,t = cls.check_type_check_len(s) + x = 0 + if s: + if (s[0])&0x80: # negative int + x = -1 + for c in s: + x <<= 8 + x |= (c) + return cls.asn1_object(x),t + + +class BERcodec_BOOLEAN(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.BOOLEAN + +class BERcodec_ENUMERATED(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.ENUMERATED + +class BERcodec_NULL(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.NULL + @classmethod + def enc(cls, i): + if i == 0: + return bytes([hash(cls.tag)])+b"\0" + else: + return BERcodec_INTEGER.enc(i) + +class BERcodec_SEP(BERcodec_NULL): + tag = ASN1_Class_UNIVERSAL.SEP + +class BERcodec_STRING(BERcodec_Object): + tag = ASN1_Class_UNIVERSAL.STRING + @classmethod + def enc(cls,s): + if type(s) is str: + s = s.encode('ascii') + return bytes([hash(cls.tag)])+BER_len_enc(len(s))+s + @classmethod + def do_dec(cls, s, context=None, safe=False): + l,s,t = cls.check_type_check_len(s) + return cls.tag.asn1_object(s),t + +class BERcodec_BIT_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.BIT_STRING + +class BERcodec_PRINTABLE_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING + +class BERcodec_T61_STRING (BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.T61_STRING + +class BERcodec_IA5_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.IA5_STRING + +class BERcodec_NUMERIC_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING + +class BERcodec_VIDEOTEX_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING + +class BERcodec_IPADDRESS(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.IPADDRESS + + @classmethod + def enc(cls, ipaddr_ascii): + try: + s = inet_aton(ipaddr_ascii) + except Exception: + raise BER_Encoding_Error("IPv4 address could not be encoded") + return bytes([hash(cls.tag)])+BER_len_enc(len(s))+s + + @classmethod + def do_dec(cls, s, context=None, safe=False): + l,s,t = cls.check_type_check_len(s) + try: + ipaddr_ascii = inet_ntoa(s) + except Exception: + raise BER_Decoding_Error("IP address could not be decoded", decoded=obj) + return cls.asn1_object(ipaddr_ascii), t + +class BERcodec_UTC_TIME(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.UTC_TIME + +class BERcodec_GENERALIZED_TIME(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME + +class BERcodec_TIME_TICKS(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.TIME_TICKS + +class BERcodec_GAUGE32(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.GAUGE32 + +class BERcodec_COUNTER32(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.COUNTER32 + +class BERcodec_SEQUENCE(BERcodec_Object): + tag = ASN1_Class_UNIVERSAL.SEQUENCE + @classmethod + def enc(cls, l): + #if type(l) is not str: + if type(l) is not bytes: + l = b"".join(map(lambda x: x.enc(cls.codec), l)) + return bytes([hash(cls.tag)])+BER_len_enc(len(l))+l + @classmethod + def do_dec(cls, s, context=None, safe=False): + if context is None: + context = cls.tag.context + l,st = cls.check_type_get_len(s) # we may have len(s) < l + s,t = st[:l],st[l:] + obj = [] + while s: + try: + o,s = BERcodec_Object.dec(s, context, safe) + except BER_Decoding_Error as err: + err.remaining += t + if err.decoded is not None: + obj.append(err.decoded) + err.decoded = obj + raise + obj.append(o) + if len(st) < l: + raise BER_Decoding_Error("Not enough bytes to decode sequence", decoded=obj) + return cls.asn1_object(obj),t + +class BERcodec_SET(BERcodec_SEQUENCE): + tag = ASN1_Class_UNIVERSAL.SET + + +class BERcodec_OID(BERcodec_Object): + tag = ASN1_Class_UNIVERSAL.OID + + @classmethod + def enc(cls, oid): + if type(oid) is str: + oid = oid.encode('ascii') + lst = [int(x) for x in oid.strip(b".").split(b".")] + if len(lst) >= 2: + lst[1] += 40*lst[0] + del(lst[0]) + s = b"".join([BER_num_enc(k) for k in lst]) + return bytes([hash(cls.tag)])+BER_len_enc(len(s))+s + @classmethod + def do_dec(cls, s, context=None, safe=False): + l,s,t = cls.check_type_check_len(s) + lst = [] + while s: + l,s = BER_num_dec(s) + lst.append(l) + if (len(lst) > 0): + lst.insert(0,lst[0]//40) + lst[1] %= 40 + return cls.asn1_object(b".".join([str(k).encode('ascii') for k in lst])), t + + diff --git a/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/mib.py b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/mib.py new file mode 100644 index 00000000..8f3df25c --- /dev/null +++ b/scripts/external_libs/scapy-2.3.1/python3/scapy/asn1/mib.py @@ -0,0 +1,149 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +Management Information Base (MIB) parsing +""" + +import re +from glob import glob +from scapy.dadict import DADict,fixname +from scapy.config import conf +from scapy.utils import do_graph + +################# +## MIB parsing ## +################# + +_mib_re_integer = re.compile(b"^[0-9]+$") +_mib_re_both = re.compile(b"^([a-zA-Z_][a-zA-Z0-9_-]*)\(([0-9]+)\)$") +_mib_re_oiddecl = re.compile(b"$\s*([a-zA-Z0-9_-]+)\s+OBJECT([^:\{\}]|\{[^:]+\})+::=\s*\{([^\}]+)\}",re.M) +_mib_re_strings = re.compile(b'"[^"]*"') +_mib_re_comments = re.compile(b'--.*(\r|\n)') + +class MIBDict(DADict): + def _findroot(self, x): + if x.startswith(b"."): + x = x[1:] + if not x.endswith(b"."): + x += b"." + max=0 + root=b"." + for k in self.keys(): + if x.startswith(self[k]+b"."): + if max < len(self[k]): + max = len(self[k]) + root = k + return root, x[max:-1] + def _oidname(self, x): + root,remainder = self._findroot(x) + return root+remainder + def _oid(self, x): + if type(x) is str: + x = x.encode('ascii') + xl = x.strip(b".").split(b".") + p = len(xl)-1 + while p >= 0 and _mib_re_integer.match(xl[p]): + p -= 1 + if p != 0 or xl[p] not in self: + return x + xl[p] = self[xl[p]] + return b".".join(xl[p:]) + def _make_graph(self, other_keys=[], **kargs): + nodes = [(k,self[k]) for k in self.keys()] + oids = [self[k] for k in self.keys()] + for k in other_keys: + if k not in oids: + nodes.append(self.oidname(k),k) + s = 'digraph "mib" {\n\trankdir=LR;\n\n' + for k,o in nodes: + s += '\t"%s" [ label="%s" ];\n' % (o,k) + s += "\n" + for k,o in nodes: + parent,remainder = self._findroot(o[:-1]) + remainder = remainder[1:]+o[-1] + if parent != ".": + parent = self[parent] + s += '\t"%s" -> "%s" [label="%s"];\n' % (parent, o,remainder) + s += "}\n" + do_graph(s, **kargs) + def __len__(self): + return len(self.keys()) + + +def mib_register(ident, value, the_mib, unresolved): + if ident in the_mib or ident in unresolved: + return ident in the_mib + resval = [] + not_resolved = 0 + for v in value: + if _mib_re_integer.match(v): + resval.append(v) + else: + v = fixname(v) + if v not in the_mib: + not_resolved = 1 + if v in the_mib: + v = the_mib[v] + elif v in unresolved: + v = unresolved[v] + if type(v) is list: + resval += v + else: + resval.append(v) + if not_resolved: + unresolved[ident] = resval + return False + else: + the_mib[ident] = resval + keys = unresolved.keys() + i = 0 + while i < len(keys): + k = keys[i] + if mib_register(k,unresolved[k], the_mib, {}): + del(unresolved[k]) + del(keys[i]) + i = 0 + else: + i += 1 + + return True + + +def load_mib(filenames): + the_mib = {'iso': ['1']} + unresolved = {} + for k in conf.mib.keys(): + mib_register(k, conf.mib[k].split("."), the_mib, unresolved) + + if type(filenames) is str: + filenames = [filenames] + for fnames in filenames: + for fname in glob(fnames): + f = open(fname) + text = f.read() + cleantext = " ".join(_mib_re_strings.split(" ".join(_mib_re_comments.split(text)))) + for m in _mib_re_oiddecl.finditer(cleantext): + gr = m.groups() + ident,oid = gr[0],gr[-1] + ident=fixname(ident) + oid = oid.split() + for i in range(len(oid)): + m = _mib_re_both.match(oid[i]) + if m: + oid[i] = m.groups()[1] + mib_register(ident, oid, the_mib, unresolved) + + newmib = MIBDict(_name="MIB") + for k,o in the_mib.items(): + newmib[k]=".".join(o) + for k,o in unresolved.items(): + newmib[k]=".".join(o) + + conf.mib=newmib + + + +conf.mib = MIBDict(_name="MIB") |