summaryrefslogtreecommitdiffstats
path: root/external_libs/python/dpkt-1.8.6.2/dpkt/dns.py
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/dpkt-1.8.6.2/dpkt/dns.py')
-rw-r--r--external_libs/python/dpkt-1.8.6.2/dpkt/dns.py462
1 files changed, 462 insertions, 0 deletions
diff --git a/external_libs/python/dpkt-1.8.6.2/dpkt/dns.py b/external_libs/python/dpkt-1.8.6.2/dpkt/dns.py
new file mode 100644
index 00000000..d4d08e47
--- /dev/null
+++ b/external_libs/python/dpkt-1.8.6.2/dpkt/dns.py
@@ -0,0 +1,462 @@
+# $Id: dns.py 27 2006-11-21 01:22:52Z dahelder $
+# -*- coding: utf-8 -*-
+"""Domain Name System."""
+
+import struct
+import dpkt
+from decorators import deprecated
+
+DNS_Q = 0
+DNS_R = 1
+
+# Opcodes
+DNS_QUERY = 0
+DNS_IQUERY = 1
+DNS_STATUS = 2
+DNS_NOTIFY = 4
+DNS_UPDATE = 5
+
+# Flags
+DNS_CD = 0x0010 # checking disabled
+DNS_AD = 0x0020 # authenticated data
+DNS_Z = 0x0040 # unused
+DNS_RA = 0x0080 # recursion available
+DNS_RD = 0x0100 # recursion desired
+DNS_TC = 0x0200 # truncated
+DNS_AA = 0x0400 # authoritative answer
+DNS_QR = 0x8000 # response ( query / response )
+
+# Response codes
+DNS_RCODE_NOERR = 0
+DNS_RCODE_FORMERR = 1
+DNS_RCODE_SERVFAIL = 2
+DNS_RCODE_NXDOMAIN = 3
+DNS_RCODE_NOTIMP = 4
+DNS_RCODE_REFUSED = 5
+DNS_RCODE_YXDOMAIN = 6
+DNS_RCODE_YXRRSET = 7
+DNS_RCODE_NXRRSET = 8
+DNS_RCODE_NOTAUTH = 9
+DNS_RCODE_NOTZONE = 10
+
+# RR types
+DNS_A = 1
+DNS_NS = 2
+DNS_CNAME = 5
+DNS_SOA = 6
+DNS_PTR = 12
+DNS_HINFO = 13
+DNS_MX = 15
+DNS_TXT = 16
+DNS_AAAA = 28
+DNS_SRV = 33
+
+# RR classes
+DNS_IN = 1
+DNS_CHAOS = 3
+DNS_HESIOD = 4
+DNS_ANY = 255
+
+
+def pack_name(name, off, label_ptrs):
+ if name:
+ labels = name.split('.')
+ else:
+ labels = []
+ labels.append('')
+ buf = ''
+ for i, label in enumerate(labels):
+ key = '.'.join(labels[i:]).upper()
+ ptr = label_ptrs.get(key)
+ if not ptr:
+ if len(key) > 1:
+ ptr = off + len(buf)
+ if ptr < 0xc000:
+ label_ptrs[key] = ptr
+ i = len(label)
+ buf += chr(i) + label
+ else:
+ buf += struct.pack('>H', (0xc000 | ptr))
+ break
+ return buf
+
+
+def unpack_name(buf, off):
+ name = ''
+ saved_off = 0
+ for _ in range(100): # XXX
+ n = ord(buf[off])
+ if n == 0:
+ off += 1
+ break
+ elif (n & 0xc0) == 0xc0:
+ ptr = struct.unpack('>H', buf[off:off + 2])[0] & 0x3fff
+ off += 2
+ if not saved_off:
+ saved_off = off
+ # XXX - don't use recursion!@#$
+ name = name + unpack_name(buf, ptr)[0] + '.'
+ break
+ else:
+ off += 1
+ name = name + buf[off:off + n] + '.'
+ if len(name) > 255:
+ raise dpkt.UnpackError('name longer than 255 bytes')
+ off += n
+ return name.strip('.'), off
+
+
+class DNS(dpkt.Packet):
+ __hdr__ = (
+ ('id', 'H', 0),
+ ('op', 'H', DNS_RD), # recursive query
+ # XXX - lists of query, RR objects
+ ('qd', 'H', []),
+ ('an', 'H', []),
+ ('ns', 'H', []),
+ ('ar', 'H', [])
+ )
+
+ @property
+ def qr(self):
+ return int((self.op & DNS_QR) == DNS_QR)
+
+ @qr.setter
+ def qr(self, v):
+ if v:
+ self.op |= DNS_QR
+ else:
+ self.op &= ~DNS_QR
+
+ @property
+ def opcode(self):
+ return (self.op >> 11) & 0xf
+
+ @opcode.setter
+ def opcode(self, v):
+ self.op = (self.op & ~0x7800) | ((v & 0xf) << 11)
+
+ @property
+ def aa(self):
+ return int((self.op & DNS_AA) == DNS_AA)
+
+ @aa.setter
+ def aa(self, v):
+ if v:
+ self.op |= DNS_AA
+ else:
+ self.op &= ~DNS_AA
+
+ @property
+ def rd(self):
+ return int((self.op & DNS_RD) == DNS_RD)
+
+ @rd.setter
+ def rd(self, v):
+ if v:
+ self.op |= DNS_RD
+ else:
+ self.op &= ~DNS_RD
+
+ @property
+ def ra(self):
+ return int((self.op & DNS_RA) == DNS_RA)
+
+ @ra.setter
+ def ra(self, v):
+ if v:
+ self.op |= DNS_RA
+ else:
+ self.op &= ~DNS_RA
+
+ @property
+ def zero(self):
+ return int((self.op & DNS_Z) == DNS_Z)
+
+ @zero.setter
+ def zero(self, v):
+ if v:
+ self.op |= DNS_Z
+ else:
+ self.op &= ~DNS_Z
+
+ @property
+ def rcode(self):
+ return self.op & 0xf
+
+ @rcode.setter
+ def rcode(self, v):
+ self.op = (self.op & ~0xf) | (v & 0xf)
+
+ # Deprecated methods, will be removed in the future
+ # ======================================================
+ @deprecated
+ def get_qr(self):
+ return self.qr
+
+ @deprecated
+ def set_qr(self, v):
+ self.qr = v
+
+ @deprecated
+ def get_opcode(self):
+ return self.opcode
+
+ @deprecated
+ def set_opcode(self, v):
+ self.opcode = v
+
+ @deprecated
+ def get_aa(self):
+ return self.aa
+
+ @deprecated
+ def set_aa(self, v):
+ self.aa = v
+
+ @deprecated
+ def get_rd(self):
+ return self.rd
+
+ @deprecated
+ def set_rd(self, v):
+ self.rd = v
+
+ @deprecated
+ def get_ra(self):
+ return self.ra
+
+ @deprecated
+ def set_ra(self, v):
+ self.ra = v
+
+ @deprecated
+ def get_zero(self):
+ return self.zero
+
+ @deprecated
+ def set_zero(self, v):
+ self.zero = v
+
+ @deprecated
+ def get_rcode(self):
+ return self.rcode
+
+ @deprecated
+ def set_rcode(self, v):
+ self.rcode = v
+
+ # ======================================================
+
+ class Q(dpkt.Packet):
+ """DNS question."""
+ __hdr__ = (
+ ('name', '1025s', ''),
+ ('type', 'H', DNS_A),
+ ('cls', 'H', DNS_IN)
+ )
+
+ # XXX - suk
+ def __len__(self):
+ raise NotImplementedError
+
+ __str__ = __len__
+
+ def unpack(self, buf):
+ raise NotImplementedError
+
+ class RR(Q):
+ """DNS resource record."""
+ __hdr__ = (
+ ('name', '1025s', ''),
+ ('type', 'H', DNS_A),
+ ('cls', 'H', DNS_IN),
+ ('ttl', 'I', 0),
+ ('rlen', 'H', 4),
+ ('rdata', 's', '')
+ )
+
+ def pack_rdata(self, off, label_ptrs):
+ # XXX - yeah, this sux
+ if self.rdata:
+ return self.rdata
+ if self.type == DNS_A:
+ return self.ip
+ elif self.type == DNS_NS:
+ return pack_name(self.nsname, off, label_ptrs)
+ elif self.type == DNS_CNAME:
+ return pack_name(self.cname, off, label_ptrs)
+ elif self.type == DNS_PTR:
+ return pack_name(self.ptrname, off, label_ptrs)
+ elif self.type == DNS_SOA:
+ l = []
+ l.append(pack_name(self.mname, off, label_ptrs))
+ l.append(pack_name(self.rname, off + len(l[0]), label_ptrs))
+ l.append(struct.pack('>IIIII', self.serial, self.refresh,
+ self.retry, self.expire, self.minimum))
+ return ''.join(l)
+ elif self.type == DNS_MX:
+ return struct.pack('>H', self.preference) + \
+ pack_name(self.mxname, off + 2, label_ptrs)
+ elif self.type == DNS_TXT or self.type == DNS_HINFO:
+ return ''.join(['%s%s' % (chr(len(x)), x)
+ for x in self.text])
+ elif self.type == DNS_AAAA:
+ return self.ip6
+ elif self.type == DNS_SRV:
+ return struct.pack('>HHH', self.priority, self.weight, self.port) + \
+ pack_name(self.srvname, off + 6, label_ptrs)
+
+ def unpack_rdata(self, buf, off):
+ if self.type == DNS_A:
+ self.ip = self.rdata
+ elif self.type == DNS_NS:
+ self.nsname, off = unpack_name(buf, off)
+ elif self.type == DNS_CNAME:
+ self.cname, off = unpack_name(buf, off)
+ elif self.type == DNS_PTR:
+ self.ptrname, off = unpack_name(buf, off)
+ elif self.type == DNS_SOA:
+ self.mname, off = unpack_name(buf, off)
+ self.rname, off = unpack_name(buf, off)
+ self.serial, self.refresh, self.retry, self.expire, \
+ self.minimum = struct.unpack('>IIIII', buf[off:off + 20])
+ elif self.type == DNS_MX:
+ self.preference = struct.unpack('>H', self.rdata[:2])
+ self.mxname, off = unpack_name(buf, off + 2)
+ elif self.type == DNS_TXT or self.type == DNS_HINFO:
+ self.text = []
+ buf = self.rdata
+ while buf:
+ n = ord(buf[0])
+ self.text.append(buf[1:1 + n])
+ buf = buf[1 + n:]
+ elif self.type == DNS_AAAA:
+ self.ip6 = self.rdata
+ elif self.type == DNS_SRV:
+ self.priority, self.weight, self.port = struct.unpack('>HHH', self.rdata[:6])
+ self.srvname, off = unpack_name(buf, off + 6)
+
+ def pack_q(self, buf, q):
+ """Append packed DNS question and return buf."""
+ return buf + pack_name(q.name, len(buf), self.label_ptrs) + struct.pack('>HH', q.type, q.cls)
+
+ def unpack_q(self, buf, off):
+ """Return DNS question and new offset."""
+ q = self.Q()
+ q.name, off = unpack_name(buf, off)
+ q.type, q.cls = struct.unpack('>HH', buf[off:off + 4])
+ off += 4
+ return q, off
+
+ def pack_rr(self, buf, rr):
+ """Append packed DNS RR and return buf."""
+ name = pack_name(rr.name, len(buf), self.label_ptrs)
+ rdata = rr.pack_rdata(len(buf) + len(name) + 10, self.label_ptrs)
+ return buf + name + struct.pack('>HHIH', rr.type, rr.cls, rr.ttl, len(rdata)) + rdata
+
+ def unpack_rr(self, buf, off):
+ """Return DNS RR and new offset."""
+ rr = self.RR()
+ rr.name, off = unpack_name(buf, off)
+ rr.type, rr.cls, rr.ttl, rdlen = struct.unpack('>HHIH', buf[off:off + 10])
+ off += 10
+ rr.rdata = buf[off:off + rdlen]
+ rr.unpack_rdata(buf, off)
+ off += rdlen
+ return rr, off
+
+ def unpack(self, buf):
+ dpkt.Packet.unpack(self, buf)
+ off = self.__hdr_len__
+ cnt = self.qd # FIXME: This relies on this being properly set somewhere else
+ self.qd = []
+ for _ in range(cnt):
+ q, off = self.unpack_q(buf, off)
+ self.qd.append(q)
+ for x in ('an', 'ns', 'ar'):
+ cnt = getattr(self, x, 0)
+ setattr(self, x, [])
+ for _ in range(cnt):
+ rr, off = self.unpack_rr(buf, off)
+ getattr(self, x).append(rr)
+ self.data = ''
+
+ def __len__(self):
+ # XXX - cop out
+ return len(str(self))
+
+ def __str__(self):
+ # XXX - compress names on the fly
+ self.label_ptrs = {}
+ buf = struct.pack(self.__hdr_fmt__, self.id, self.op, len(self.qd),
+ len(self.an), len(self.ns), len(self.ar))
+ for q in self.qd:
+ buf = self.pack_q(buf, q)
+ for x in ('an', 'ns', 'ar'):
+ for rr in getattr(self, x):
+ buf = self.pack_rr(buf, rr)
+ del self.label_ptrs
+ return buf
+
+
+def test_basic():
+ from ip import IP
+
+ s = 'E\x00\x02\x08\xc15\x00\x00\x80\x11\x92aBk0\x01Bk0w\x005\xc07\x01\xf4\xda\xc2d\xd2\x81\x80\x00\x01\x00\x03\x00\x0b\x00\x0b\x03www\x06google\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x03V\x00\x17\x03www\x06google\x06akadns\x03net\x00\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabh\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabc\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0c\x04usw5\x04akam\xc0>\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw6\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw7\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x08\x05asia3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02za\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0f\x02zc\x06akadns\x03org\x00\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zf\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zh\xc0\xd5\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04eur3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use2\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use4\xc0t\xc0\xc1\x00\x01\x00\x01\x00\x00\xfb4\x00\x04\xd0\xb9\x84\xb0\xc0\xd2\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xf1\xc76\xc0\xed\x00\x01\x00\x01\x00\x00\xfb4\x00\x04?\xd7\xc6S\xc0\xfe\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xd00.\xc1\x0f\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xc1-\x01g\xc1"\x00\x01\x00\x01\x00\x00\x101\x00\x04?\xd1\xaa\x88\xc15\x00\x01\x00\x01\x00\x00\r\x1a\x00\x04PCC\xb6\xc0o\x00\x01\x00\x01\x00\x00\x10\x7f\x00\x04?\xf1I\xd6\xc0\x87\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xce\x84dl\xc0\x9a\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04A\xcb\xea\x1b\xc0\xad\x00\x01\x00\x01\x00\x00\x0b)\x00\x04\xc1l\x9a\t'
+ ip = IP(s)
+ my_dns = DNS(ip.udp.data)
+ assert my_dns.qd[0].name == 'www.google.com' and my_dns.an[1].name == 'www.google.akadns.net'
+ s = '\x05\xf5\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x03cnn\x03com\x00\x00\x01\x00\x01'
+ my_dns = DNS(s)
+ assert s == str(my_dns)
+
+
+def test_PTR():
+ s = 'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O'
+ my_dns = DNS(s)
+ assert my_dns.qd[0].name == '1.1.211.141.in-addr.arpa' and \
+ my_dns.an[0].ptrname == 'default.v-umce-ifs.umnet.umich.edu' and \
+ my_dns.ns[0].nsname == 'shabby.ifs.umich.edu' and \
+ my_dns.ns[1].ttl == 3382L and \
+ my_dns.ns[2].nsname == 'dns2.itd.umich.edu'
+ assert s == str(my_dns)
+
+
+def test_pack_name():
+ # Empty name is \0
+ x = pack_name('', 0, {})
+ assert x == '\0'
+
+
+def test_deprecated_methods():
+ """Test deprecated methods. Note: when they are removed so should this test"""
+ s = 'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O'
+ my_dns = DNS(s)
+ my_dns.get_aa()
+ my_dns.get_opcode()
+ qr = my_dns.get_qr()
+ my_dns.set_qr(qr)
+ qr2 = my_dns.get_qr()
+ assert qr == qr2
+
+
+def test_deprecated_method_performance():
+ """Test the performance hit for the deprecation decorator"""
+ from timeit import Timer
+
+ s = 'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O'
+ my_dns = DNS(s)
+ t1 = Timer(lambda: my_dns.aa).timeit(10000)
+ t2 = Timer(my_dns.get_aa).timeit(10000)
+ print 'Performance of dns.aa vs. dns.get_aa(): %f %f' % (t1, t2)
+
+
+if __name__ == '__main__':
+ # Runs all the test associated with this class/file
+ test_basic()
+ test_PTR()
+ test_pack_name()
+ test_deprecated_methods()
+ test_deprecated_method_performance()
+ print 'Tests Successful...'