summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-03-21 16:03:47 +0200
committerimarom <imarom@cisco.com>2016-03-21 16:03:47 +0200
commitb89efa188810bf95a9d245e69e2961b5721c3b0f (patch)
tree454273ac6c4ae972ebb8a2c86b893296970b4fa9 /scripts/external_libs/scapy-2.3.1/python2/scapy/asn1
parentf72c6df9d2e9998ae1f3529d729ab7930b35785a (diff)
scapy python 2/3
Diffstat (limited to 'scripts/external_libs/scapy-2.3.1/python2/scapy/asn1')
-rw-r--r--scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/__init__.py12
-rw-r--r--scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/asn1.py305
-rw-r--r--scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/ber.py363
-rw-r--r--scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/mib.py147
4 files changed, 827 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/__init__.py b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/__init__.py
new file mode 100644
index 00000000..4827a588
--- /dev/null
+++ b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/__init__.py
@@ -0,0 +1,12 @@
+## This file is part of Scapy
+## See http://www.secdev.org/projects/scapy for more informations
+## Copyright (C) Philippe Biondi <phil@secdev.org>
+## This program is published under a GPLv2 license
+
+"""
+Package holding ASN.1 related modules.
+"""
+
+# We do not import mib.py because it is more bound to scapy and
+# less prone to be used in a standalone fashion
+__all__ = ["asn1","ber"]
diff --git a/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/asn1.py b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/asn1.py
new file mode 100644
index 00000000..bad7b2cf
--- /dev/null
+++ b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/asn1.py
@@ -0,0 +1,305 @@
+## This file is part of Scapy
+## See http://www.secdev.org/projects/scapy for more informations
+## Copyright (C) Philippe Biondi <phil@secdev.org>
+## This program is published under a GPLv2 license
+
+"""
+ASN.1 (Abstract Syntax Notation One)
+"""
+
+import random
+from scapy.config import conf
+from scapy.error import Scapy_Exception,warning
+from scapy.volatile import RandField
+from scapy.utils import Enum_metaclass, EnumElement
+
+class RandASN1Object(RandField):
+ def __init__(self, objlist=None):
+ if objlist is None:
+ objlist = map(lambda x:x._asn1_obj,
+ filter(lambda x:hasattr(x,"_asn1_obj"), ASN1_Class_UNIVERSAL.__rdict__.values()))
+ self.objlist = objlist
+ self.chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
+ def _fix(self, n=0):
+ o = random.choice(self.objlist)
+ if issubclass(o, ASN1_INTEGER):
+ return o(int(random.gauss(0,1000)))
+ elif issubclass(o, ASN1_IPADDRESS):
+ z = RandIP()._fix()
+ return o(z)
+ elif issubclass(o, ASN1_STRING):
+ z = int(random.expovariate(0.05)+1)
+ return o("".join([random.choice(self.chars) for i in range(z)]))
+ elif issubclass(o, ASN1_SEQUENCE) and (n < 10):
+ z = int(random.expovariate(0.08)+1)
+ return o(map(lambda x:x._fix(n+1), [self.__class__(objlist=self.objlist)]*z))
+ return ASN1_INTEGER(int(random.gauss(0,1000)))
+
+
+##############
+#### ASN1 ####
+##############
+
+class ASN1_Error(Scapy_Exception):
+ pass
+
+class ASN1_Encoding_Error(ASN1_Error):
+ pass
+
+class ASN1_Decoding_Error(ASN1_Error):
+ pass
+
+class ASN1_BadTag_Decoding_Error(ASN1_Decoding_Error):
+ pass
+
+
+
+class ASN1Codec(EnumElement):
+ def register_stem(cls, stem):
+ cls._stem = stem
+ def dec(cls, s, context=None):
+ return cls._stem.dec(s, context=context)
+ def safedec(cls, s, context=None):
+ return cls._stem.safedec(s, context=context)
+ def get_stem(cls):
+ return cls.stem
+
+
+class ASN1_Codecs_metaclass(Enum_metaclass):
+ element_class = ASN1Codec
+
+class ASN1_Codecs:
+ __metaclass__ = ASN1_Codecs_metaclass
+ BER = 1
+ DER = 2
+ PER = 3
+ CER = 4
+ LWER = 5
+ BACnet = 6
+ OER = 7
+ SER = 8
+ XER = 9
+
+class ASN1Tag(EnumElement):
+ def __init__(self, key, value, context=None, codec=None):
+ EnumElement.__init__(self, key, value)
+ self._context = context
+ if codec == None:
+ codec = {}
+ self._codec = codec
+ def clone(self): # /!\ not a real deep copy. self.codec is shared
+ return self.__class__(self._key, self._value, self._context, self._codec)
+ def register_asn1_object(self, asn1obj):
+ self._asn1_obj = asn1obj
+ def asn1_object(self, val):
+ if hasattr(self,"_asn1_obj"):
+ return self._asn1_obj(val)
+ raise ASN1_Error("%r does not have any assigned ASN1 object" % self)
+ def register(self, codecnum, codec):
+ self._codec[codecnum] = codec
+ def get_codec(self, codec):
+ try:
+ c = self._codec[codec]
+ except KeyError,msg:
+ raise ASN1_Error("Codec %r not found for tag %r" % (codec, self))
+ return c
+
+class ASN1_Class_metaclass(Enum_metaclass):
+ element_class = ASN1Tag
+ def __new__(cls, name, bases, dct): # XXX factorise a bit with Enum_metaclass.__new__()
+ for b in bases:
+ for k,v in b.__dict__.iteritems():
+ if k not in dct and isinstance(v,ASN1Tag):
+ dct[k] = v.clone()
+
+ rdict = {}
+ for k,v in dct.iteritems():
+ if type(v) is int:
+ v = ASN1Tag(k,v)
+ dct[k] = v
+ rdict[v] = v
+ elif isinstance(v, ASN1Tag):
+ rdict[v] = v
+ dct["__rdict__"] = rdict
+
+ cls = type.__new__(cls, name, bases, dct)
+ for v in cls.__dict__.values():
+ if isinstance(v, ASN1Tag):
+ v.context = cls # overwrite ASN1Tag contexts, even cloned ones
+ return cls
+
+
+class ASN1_Class:
+ __metaclass__ = ASN1_Class_metaclass
+
+class ASN1_Class_UNIVERSAL(ASN1_Class):
+ name = "UNIVERSAL"
+ ERROR = -3
+ RAW = -2
+ NONE = -1
+ ANY = 0
+ BOOLEAN = 1
+ INTEGER = 2
+ BIT_STRING = 3
+ STRING = 4
+ NULL = 5
+ OID = 6
+ OBJECT_DESCRIPTOR = 7
+ EXTERNAL = 8
+ REAL = 9
+ ENUMERATED = 10
+ EMBEDDED_PDF = 11
+ UTF8_STRING = 12
+ RELATIVE_OID = 13
+ SEQUENCE = 0x30#XXX 16 ??
+ SET = 0x31 #XXX 17 ??
+ NUMERIC_STRING = 18
+ PRINTABLE_STRING = 19
+ T61_STRING = 20
+ VIDEOTEX_STRING = 21
+ IA5_STRING = 22
+ UTC_TIME = 23
+ GENERALIZED_TIME = 24
+ GRAPHIC_STRING = 25
+ ISO646_STRING = 26
+ GENERAL_STRING = 27
+ UNIVERSAL_STRING = 28
+ CHAR_STRING = 29
+ BMP_STRING = 30
+ IPADDRESS = 0x40
+ COUNTER32 = 0x41
+ GAUGE32 = 0x42
+ TIME_TICKS = 0x43
+ SEP = 0x80
+
+class ASN1_Object_metaclass(type):
+ def __new__(cls, name, bases, dct):
+ c = super(ASN1_Object_metaclass, cls).__new__(cls, name, bases, dct)
+ try:
+ c.tag.register_asn1_object(c)
+ except:
+ warning("Error registering %r for %r" % (c.tag, c.codec))
+ return c
+
+
+class ASN1_Object:
+ __metaclass__ = ASN1_Object_metaclass
+ tag = ASN1_Class_UNIVERSAL.ANY
+ def __init__(self, val):
+ self.val = val
+ def enc(self, codec):
+ return self.tag.get_codec(codec).enc(self.val)
+ def __repr__(self):
+ return "<%s[%r]>" % (self.__dict__.get("name", self.__class__.__name__), self.val)
+ def __str__(self):
+ return self.enc(conf.ASN1_default_codec)
+ def strshow(self, lvl=0):
+ return (" "*lvl)+repr(self)+"\n"
+ def show(self, lvl=0):
+ print self.strshow(lvl)
+ def __eq__(self, other):
+ return self.val == other
+ def __cmp__(self, other):
+ return cmp(self.val, other)
+
+class ASN1_DECODING_ERROR(ASN1_Object):
+ tag = ASN1_Class_UNIVERSAL.ERROR
+ def __init__(self, val, exc=None):
+ ASN1_Object.__init__(self, val)
+ self.exc = exc
+ def __repr__(self):
+ return "<%s[%r]{{%s}}>" % (self.__dict__.get("name", self.__class__.__name__),
+ self.val, self.exc.args[0])
+ def enc(self, codec):
+ if isinstance(self.val, ASN1_Object):
+ return self.val.enc(codec)
+ return self.val
+
+class ASN1_force(ASN1_Object):
+ tag = ASN1_Class_UNIVERSAL.RAW
+ def enc(self, codec):
+ if isinstance(self.val, ASN1_Object):
+ return self.val.enc(codec)
+ return self.val
+
+class ASN1_BADTAG(ASN1_force):
+ pass
+
+class ASN1_INTEGER(ASN1_Object):
+ tag = ASN1_Class_UNIVERSAL.INTEGER
+
+class ASN1_STRING(ASN1_Object):
+ tag = ASN1_Class_UNIVERSAL.STRING
+
+class ASN1_BIT_STRING(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.BIT_STRING
+
+class ASN1_PRINTABLE_STRING(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
+
+class ASN1_T61_STRING(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.T61_STRING
+
+class ASN1_IA5_STRING(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.IA5_STRING
+
+class ASN1_NUMERIC_STRING(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING
+
+class ASN1_VIDEOTEX_STRING(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING
+
+class ASN1_IPADDRESS(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.IPADDRESS
+
+class ASN1_UTC_TIME(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.UTC_TIME
+
+class ASN1_GENERALIZED_TIME(ASN1_STRING):
+ tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
+
+class ASN1_TIME_TICKS(ASN1_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.TIME_TICKS
+
+class ASN1_BOOLEAN(ASN1_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.BOOLEAN
+
+class ASN1_ENUMERATED(ASN1_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.ENUMERATED
+
+class ASN1_NULL(ASN1_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.NULL
+
+class ASN1_SEP(ASN1_NULL):
+ tag = ASN1_Class_UNIVERSAL.SEP
+
+class ASN1_GAUGE32(ASN1_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.GAUGE32
+
+class ASN1_COUNTER32(ASN1_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.COUNTER32
+
+class ASN1_SEQUENCE(ASN1_Object):
+ tag = ASN1_Class_UNIVERSAL.SEQUENCE
+ def strshow(self, lvl=0):
+ s = (" "*lvl)+("# %s:" % self.__class__.__name__)+"\n"
+ for o in self.val:
+ s += o.strshow(lvl=lvl+1)
+ return s
+
+class ASN1_SET(ASN1_SEQUENCE):
+ tag = ASN1_Class_UNIVERSAL.SET
+
+class ASN1_OID(ASN1_Object):
+ tag = ASN1_Class_UNIVERSAL.OID
+ def __init__(self, val):
+ val = conf.mib._oid(val)
+ ASN1_Object.__init__(self, val)
+ def __repr__(self):
+ return "<%s[%r]>" % (self.__dict__.get("name", self.__class__.__name__), conf.mib._oidname(self.val))
+ def __oidname__(self):
+ return '%s'%conf.mib._oidname(self.val)
+
+
+
+conf.ASN1_default_codec = ASN1_Codecs.BER
diff --git a/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/ber.py b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/ber.py
new file mode 100644
index 00000000..2312e025
--- /dev/null
+++ b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/ber.py
@@ -0,0 +1,363 @@
+## This file is part of Scapy
+## See http://www.secdev.org/projects/scapy for more informations
+## Copyright (C) Philippe Biondi <phil@secdev.org>
+## This program is published under a GPLv2 license
+
+"""
+Basic Encoding Rules (BER) for ASN.1
+"""
+
+from scapy.error import warning
+from scapy.utils import inet_aton,inet_ntoa
+from asn1 import ASN1_Decoding_Error,ASN1_Encoding_Error,ASN1_BadTag_Decoding_Error,ASN1_Codecs,ASN1_Class_UNIVERSAL,ASN1_Error,ASN1_DECODING_ERROR,ASN1_BADTAG
+
+##################
+## BER encoding ##
+##################
+
+
+
+#####[ BER tools ]#####
+
+
+class BER_Exception(Exception):
+ pass
+
+class BER_Encoding_Error(ASN1_Encoding_Error):
+ def __init__(self, msg, encoded=None, remaining=None):
+ Exception.__init__(self, msg)
+ self.remaining = remaining
+ self.encoded = encoded
+ def __str__(self):
+ s = Exception.__str__(self)
+ if isinstance(self.encoded, BERcodec_Object):
+ s+="\n### Already encoded ###\n%s" % self.encoded.strshow()
+ else:
+ s+="\n### Already encoded ###\n%r" % self.encoded
+ s+="\n### Remaining ###\n%r" % self.remaining
+ return s
+
+class BER_Decoding_Error(ASN1_Decoding_Error):
+ def __init__(self, msg, decoded=None, remaining=None):
+ Exception.__init__(self, msg)
+ self.remaining = remaining
+ self.decoded = decoded
+ def __str__(self):
+ s = Exception.__str__(self)
+ if isinstance(self.decoded, BERcodec_Object):
+ s+="\n### Already decoded ###\n%s" % self.decoded.strshow()
+ else:
+ s+="\n### Already decoded ###\n%r" % self.decoded
+ s+="\n### Remaining ###\n%r" % self.remaining
+ return s
+
+class BER_BadTag_Decoding_Error(BER_Decoding_Error, ASN1_BadTag_Decoding_Error):
+ pass
+
+def BER_len_enc(l, size=0):
+ if l <= 127 and size==0:
+ return chr(l)
+ s = ""
+ while l or size>0:
+ s = chr(l&0xff)+s
+ l >>= 8L
+ size -= 1
+ if len(s) > 127:
+ raise BER_Exception("BER_len_enc: Length too long (%i) to be encoded [%r]" % (len(s),s))
+ return chr(len(s)|0x80)+s
+def BER_len_dec(s):
+ l = ord(s[0])
+ if not l & 0x80:
+ return l,s[1:]
+ l &= 0x7f
+ if len(s) <= l:
+ raise BER_Decoding_Error("BER_len_dec: Got %i bytes while expecting %i" % (len(s)-1, l),remaining=s)
+ ll = 0L
+ for c in s[1:l+1]:
+ ll <<= 8L
+ ll |= ord(c)
+ return ll,s[l+1:]
+
+def BER_num_enc(l, size=1):
+ x=[]
+ while l or size>0:
+ x.insert(0, l & 0x7f)
+ if len(x) > 1:
+ x[0] |= 0x80
+ l >>= 7
+ size -= 1
+ return "".join([chr(k) for k in x])
+def BER_num_dec(s):
+ x = 0
+ for i in range(len(s)):
+ c = ord(s[i])
+ x <<= 7
+ x |= c&0x7f
+ if not c&0x80:
+ break
+ if c&0x80:
+ raise BER_Decoding_Error("BER_num_dec: unfinished number description", remaining=s)
+ return x, s[i+1:]
+
+#####[ BER classes ]#####
+
+class BERcodec_metaclass(type):
+ def __new__(cls, name, bases, dct):
+ c = super(BERcodec_metaclass, cls).__new__(cls, name, bases, dct)
+ try:
+ c.tag.register(c.codec, c)
+ except:
+ warning("Error registering %r for %r" % (c.tag, c.codec))
+ return c
+
+
+class BERcodec_Object:
+ __metaclass__ = BERcodec_metaclass
+ codec = ASN1_Codecs.BER
+ tag = ASN1_Class_UNIVERSAL.ANY
+
+ @classmethod
+ def asn1_object(cls, val):
+ return cls.tag.asn1_object(val)
+
+ @classmethod
+ def check_string(cls, s):
+ if not s:
+ raise BER_Decoding_Error("%s: Got empty object while expecting tag %r" %
+ (cls.__name__,cls.tag), remaining=s)
+ @classmethod
+ def check_type(cls, s):
+ cls.check_string(s)
+ if cls.tag != ord(s[0]):
+ raise BER_BadTag_Decoding_Error("%s: Got tag [%i/%#x] while expecting %r" %
+ (cls.__name__, ord(s[0]), ord(s[0]),cls.tag), remaining=s)
+ return s[1:]
+ @classmethod
+ def check_type_get_len(cls, s):
+ s2 = cls.check_type(s)
+ if not s2:
+ raise BER_Decoding_Error("%s: No bytes while expecting a length" %
+ cls.__name__, remaining=s)
+ return BER_len_dec(s2)
+ @classmethod
+ def check_type_check_len(cls, s):
+ l,s3 = cls.check_type_get_len(s)
+ if len(s3) < l:
+ raise BER_Decoding_Error("%s: Got %i bytes while expecting %i" %
+ (cls.__name__, len(s3), l), remaining=s)
+ return l,s3[:l],s3[l:]
+
+ @classmethod
+ def do_dec(cls, s, context=None, safe=False):
+ if context is None:
+ context = cls.tag.context
+ cls.check_string(s)
+ p = ord(s[0])
+ if p not in context:
+ t = s
+ if len(t) > 18:
+ t = t[:15]+"..."
+ raise BER_Decoding_Error("Unknown prefix [%02x] for [%r]" % (p,t), remaining=s)
+ codec = context[p].get_codec(ASN1_Codecs.BER)
+ return codec.dec(s,context,safe)
+
+ @classmethod
+ def dec(cls, s, context=None, safe=False):
+ if not safe:
+ return cls.do_dec(s, context, safe)
+ try:
+ return cls.do_dec(s, context, safe)
+ except BER_BadTag_Decoding_Error,e:
+ o,remain = BERcodec_Object.dec(e.remaining, context, safe)
+ return ASN1_BADTAG(o),remain
+ except BER_Decoding_Error, e:
+ return ASN1_DECODING_ERROR(s, exc=e),""
+ except ASN1_Error, e:
+ return ASN1_DECODING_ERROR(s, exc=e),""
+
+ @classmethod
+ def safedec(cls, s, context=None):
+ return cls.dec(s, context, safe=True)
+
+
+ @classmethod
+ def enc(cls, s):
+ if type(s) is str:
+ return BERcodec_STRING.enc(s)
+ else:
+ return BERcodec_INTEGER.enc(int(s))
+
+
+
+ASN1_Codecs.BER.register_stem(BERcodec_Object)
+
+
+class BERcodec_INTEGER(BERcodec_Object):
+ tag = ASN1_Class_UNIVERSAL.INTEGER
+ @classmethod
+ def enc(cls, i):
+ s = []
+ while 1:
+ s.append(i&0xff)
+ if -127 <= i < 0:
+ break
+ if 128 <= i <= 255:
+ s.append(0)
+ i >>= 8
+ if not i:
+ break
+ s = map(chr, s)
+ s.append(BER_len_enc(len(s)))
+ s.append(chr(cls.tag))
+ s.reverse()
+ return "".join(s)
+ @classmethod
+ def do_dec(cls, s, context=None, safe=False):
+ l,s,t = cls.check_type_check_len(s)
+ x = 0L
+ if s:
+ if ord(s[0])&0x80: # negative int
+ x = -1L
+ for c in s:
+ x <<= 8
+ x |= ord(c)
+ return cls.asn1_object(x),t
+
+
+class BERcodec_BOOLEAN(BERcodec_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.BOOLEAN
+
+class BERcodec_ENUMERATED(BERcodec_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.ENUMERATED
+
+class BERcodec_NULL(BERcodec_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.NULL
+ @classmethod
+ def enc(cls, i):
+ if i == 0:
+ return chr(cls.tag)+"\0"
+ else:
+ return BERcodec_INTEGER.enc(i)
+
+class BERcodec_SEP(BERcodec_NULL):
+ tag = ASN1_Class_UNIVERSAL.SEP
+
+class BERcodec_STRING(BERcodec_Object):
+ tag = ASN1_Class_UNIVERSAL.STRING
+ @classmethod
+ def enc(cls,s):
+ return chr(cls.tag)+BER_len_enc(len(s))+s
+ @classmethod
+ def do_dec(cls, s, context=None, safe=False):
+ l,s,t = cls.check_type_check_len(s)
+ return cls.tag.asn1_object(s),t
+
+class BERcodec_BIT_STRING(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.BIT_STRING
+
+class BERcodec_PRINTABLE_STRING(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
+
+class BERcodec_T61_STRING (BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.T61_STRING
+
+class BERcodec_IA5_STRING(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.IA5_STRING
+
+class BERcodec_NUMERIC_STRING(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING
+
+class BERcodec_VIDEOTEX_STRING(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING
+
+class BERcodec_IPADDRESS(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.IPADDRESS
+
+ @classmethod
+ def enc(cls, ipaddr_ascii):
+ try:
+ s = inet_aton(ipaddr_ascii)
+ except Exception:
+ raise BER_Encoding_Error("IPv4 address could not be encoded")
+ return chr(cls.tag)+BER_len_enc(len(s))+s
+
+ @classmethod
+ def do_dec(cls, s, context=None, safe=False):
+ l,s,t = cls.check_type_check_len(s)
+ try:
+ ipaddr_ascii = inet_ntoa(s)
+ except Exception:
+ raise BER_Decoding_Error("IP address could not be decoded", decoded=obj)
+ return cls.asn1_object(ipaddr_ascii), t
+
+class BERcodec_UTC_TIME(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.UTC_TIME
+
+class BERcodec_GENERALIZED_TIME(BERcodec_STRING):
+ tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
+
+class BERcodec_TIME_TICKS(BERcodec_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.TIME_TICKS
+
+class BERcodec_GAUGE32(BERcodec_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.GAUGE32
+
+class BERcodec_COUNTER32(BERcodec_INTEGER):
+ tag = ASN1_Class_UNIVERSAL.COUNTER32
+
+class BERcodec_SEQUENCE(BERcodec_Object):
+ tag = ASN1_Class_UNIVERSAL.SEQUENCE
+ @classmethod
+ def enc(cls, l):
+ if type(l) is not str:
+ l = "".join(map(lambda x: x.enc(cls.codec), l))
+ return chr(cls.tag)+BER_len_enc(len(l))+l
+ @classmethod
+ def do_dec(cls, s, context=None, safe=False):
+ if context is None:
+ context = cls.tag.context
+ l,st = cls.check_type_get_len(s) # we may have len(s) < l
+ s,t = st[:l],st[l:]
+ obj = []
+ while s:
+ try:
+ o,s = BERcodec_Object.dec(s, context, safe)
+ except BER_Decoding_Error, err:
+ err.remaining += t
+ if err.decoded is not None:
+ obj.append(err.decoded)
+ err.decoded = obj
+ raise
+ obj.append(o)
+ if len(st) < l:
+ raise BER_Decoding_Error("Not enough bytes to decode sequence", decoded=obj)
+ return cls.asn1_object(obj),t
+
+class BERcodec_SET(BERcodec_SEQUENCE):
+ tag = ASN1_Class_UNIVERSAL.SET
+
+
+class BERcodec_OID(BERcodec_Object):
+ tag = ASN1_Class_UNIVERSAL.OID
+
+ @classmethod
+ def enc(cls, oid):
+ lst = [int(x) for x in oid.strip(".").split(".")]
+ if len(lst) >= 2:
+ lst[1] += 40*lst[0]
+ del(lst[0])
+ s = "".join([BER_num_enc(k) for k in lst])
+ return chr(cls.tag)+BER_len_enc(len(s))+s
+ @classmethod
+ def do_dec(cls, s, context=None, safe=False):
+ l,s,t = cls.check_type_check_len(s)
+ lst = []
+ while s:
+ l,s = BER_num_dec(s)
+ lst.append(l)
+ if (len(lst) > 0):
+ lst.insert(0,lst[0]/40)
+ lst[1] %= 40
+ return cls.asn1_object(".".join([str(k) for k in lst])), t
+
+
diff --git a/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/mib.py b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/mib.py
new file mode 100644
index 00000000..8531fcf2
--- /dev/null
+++ b/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1/mib.py
@@ -0,0 +1,147 @@
+## This file is part of Scapy
+## See http://www.secdev.org/projects/scapy for more informations
+## Copyright (C) Philippe Biondi <phil@secdev.org>
+## This program is published under a GPLv2 license
+
+"""
+Management Information Base (MIB) parsing
+"""
+
+import re
+from glob import glob
+from scapy.dadict import DADict,fixname
+from scapy.config import conf
+from scapy.utils import do_graph
+
+#################
+## MIB parsing ##
+#################
+
+_mib_re_integer = re.compile("^[0-9]+$")
+_mib_re_both = re.compile("^([a-zA-Z_][a-zA-Z0-9_-]*)\(([0-9]+)\)$")
+_mib_re_oiddecl = re.compile("$\s*([a-zA-Z0-9_-]+)\s+OBJECT([^:\{\}]|\{[^:]+\})+::=\s*\{([^\}]+)\}",re.M)
+_mib_re_strings = re.compile('"[^"]*"')
+_mib_re_comments = re.compile('--.*(\r|\n)')
+
+class MIBDict(DADict):
+ def _findroot(self, x):
+ if x.startswith("."):
+ x = x[1:]
+ if not x.endswith("."):
+ x += "."
+ max=0
+ root="."
+ for k in self.keys():
+ if x.startswith(self[k]+"."):
+ if max < len(self[k]):
+ max = len(self[k])
+ root = k
+ return root, x[max:-1]
+ def _oidname(self, x):
+ root,remainder = self._findroot(x)
+ return root+remainder
+ def _oid(self, x):
+ xl = x.strip(".").split(".")
+ p = len(xl)-1
+ while p >= 0 and _mib_re_integer.match(xl[p]):
+ p -= 1
+ if p != 0 or xl[p] not in self:
+ return x
+ xl[p] = self[xl[p]]
+ return ".".join(xl[p:])
+ def _make_graph(self, other_keys=[], **kargs):
+ nodes = [(k,self[k]) for k in self.keys()]
+ oids = [self[k] for k in self.keys()]
+ for k in other_keys:
+ if k not in oids:
+ nodes.append(self.oidname(k),k)
+ s = 'digraph "mib" {\n\trankdir=LR;\n\n'
+ for k,o in nodes:
+ s += '\t"%s" [ label="%s" ];\n' % (o,k)
+ s += "\n"
+ for k,o in nodes:
+ parent,remainder = self._findroot(o[:-1])
+ remainder = remainder[1:]+o[-1]
+ if parent != ".":
+ parent = self[parent]
+ s += '\t"%s" -> "%s" [label="%s"];\n' % (parent, o,remainder)
+ s += "}\n"
+ do_graph(s, **kargs)
+ def __len__(self):
+ return len(self.keys())
+
+
+def mib_register(ident, value, the_mib, unresolved):
+ if ident in the_mib or ident in unresolved:
+ return ident in the_mib
+ resval = []
+ not_resolved = 0
+ for v in value:
+ if _mib_re_integer.match(v):
+ resval.append(v)
+ else:
+ v = fixname(v)
+ if v not in the_mib:
+ not_resolved = 1
+ if v in the_mib:
+ v = the_mib[v]
+ elif v in unresolved:
+ v = unresolved[v]
+ if type(v) is list:
+ resval += v
+ else:
+ resval.append(v)
+ if not_resolved:
+ unresolved[ident] = resval
+ return False
+ else:
+ the_mib[ident] = resval
+ keys = unresolved.keys()
+ i = 0
+ while i < len(keys):
+ k = keys[i]
+ if mib_register(k,unresolved[k], the_mib, {}):
+ del(unresolved[k])
+ del(keys[i])
+ i = 0
+ else:
+ i += 1
+
+ return True
+
+
+def load_mib(filenames):
+ the_mib = {'iso': ['1']}
+ unresolved = {}
+ for k in conf.mib.keys():
+ mib_register(k, conf.mib[k].split("."), the_mib, unresolved)
+
+ if type(filenames) is str:
+ filenames = [filenames]
+ for fnames in filenames:
+ for fname in glob(fnames):
+ f = open(fname)
+ text = f.read()
+ cleantext = " ".join(_mib_re_strings.split(" ".join(_mib_re_comments.split(text))))
+ for m in _mib_re_oiddecl.finditer(cleantext):
+ gr = m.groups()
+ ident,oid = gr[0],gr[-1]
+ ident=fixname(ident)
+ oid = oid.split()
+ for i in range(len(oid)):
+ m = _mib_re_both.match(oid[i])
+ if m:
+ oid[i] = m.groups()[1]
+ mib_register(ident, oid, the_mib, unresolved)
+
+ newmib = MIBDict(_name="MIB")
+ for k,o in the_mib.iteritems():
+ newmib[k]=".".join(o)
+ for k,o in unresolved.iteritems():
+ newmib[k]=".".join(o)
+
+ conf.mib=newmib
+
+
+
+conf.mib = MIBDict(_name="MIB")