diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/python_lib/dpkt-1.8.6.2/dpkt/dns.py')
-rw-r--r-- | scripts/automation/trex_control_plane/python_lib/dpkt-1.8.6.2/dpkt/dns.py | 462 |
1 files changed, 0 insertions, 462 deletions
diff --git a/scripts/automation/trex_control_plane/python_lib/dpkt-1.8.6.2/dpkt/dns.py b/scripts/automation/trex_control_plane/python_lib/dpkt-1.8.6.2/dpkt/dns.py deleted file mode 100644 index d4d08e47..00000000 --- a/scripts/automation/trex_control_plane/python_lib/dpkt-1.8.6.2/dpkt/dns.py +++ /dev/null @@ -1,462 +0,0 @@ -# $Id: dns.py 27 2006-11-21 01:22:52Z dahelder $ -# -*- coding: utf-8 -*- -"""Domain Name System.""" - -import struct -import dpkt -from decorators import deprecated - -DNS_Q = 0 -DNS_R = 1 - -# Opcodes -DNS_QUERY = 0 -DNS_IQUERY = 1 -DNS_STATUS = 2 -DNS_NOTIFY = 4 -DNS_UPDATE = 5 - -# Flags -DNS_CD = 0x0010 # checking disabled -DNS_AD = 0x0020 # authenticated data -DNS_Z = 0x0040 # unused -DNS_RA = 0x0080 # recursion available -DNS_RD = 0x0100 # recursion desired -DNS_TC = 0x0200 # truncated -DNS_AA = 0x0400 # authoritative answer -DNS_QR = 0x8000 # response ( query / response ) - -# Response codes -DNS_RCODE_NOERR = 0 -DNS_RCODE_FORMERR = 1 -DNS_RCODE_SERVFAIL = 2 -DNS_RCODE_NXDOMAIN = 3 -DNS_RCODE_NOTIMP = 4 -DNS_RCODE_REFUSED = 5 -DNS_RCODE_YXDOMAIN = 6 -DNS_RCODE_YXRRSET = 7 -DNS_RCODE_NXRRSET = 8 -DNS_RCODE_NOTAUTH = 9 -DNS_RCODE_NOTZONE = 10 - -# RR types -DNS_A = 1 -DNS_NS = 2 -DNS_CNAME = 5 -DNS_SOA = 6 -DNS_PTR = 12 -DNS_HINFO = 13 -DNS_MX = 15 -DNS_TXT = 16 -DNS_AAAA = 28 -DNS_SRV = 33 - -# RR classes -DNS_IN = 1 -DNS_CHAOS = 3 -DNS_HESIOD = 4 -DNS_ANY = 255 - - -def pack_name(name, off, label_ptrs): - if name: - labels = name.split('.') - else: - labels = [] - labels.append('') - buf = '' - for i, label in enumerate(labels): - key = '.'.join(labels[i:]).upper() - ptr = label_ptrs.get(key) - if not ptr: - if len(key) > 1: - ptr = off + len(buf) - if ptr < 0xc000: - label_ptrs[key] = ptr - i = len(label) - buf += chr(i) + label - else: - buf += struct.pack('>H', (0xc000 | ptr)) - break - return buf - - -def unpack_name(buf, off): - name = '' - saved_off = 0 - for _ in range(100): # XXX - n = ord(buf[off]) - if n == 0: - off += 1 - break - elif (n & 0xc0) == 0xc0: - ptr = struct.unpack('>H', buf[off:off + 2])[0] & 0x3fff - off += 2 - if not saved_off: - saved_off = off - # XXX - don't use recursion!@#$ - name = name + unpack_name(buf, ptr)[0] + '.' - break - else: - off += 1 - name = name + buf[off:off + n] + '.' - if len(name) > 255: - raise dpkt.UnpackError('name longer than 255 bytes') - off += n - return name.strip('.'), off - - -class DNS(dpkt.Packet): - __hdr__ = ( - ('id', 'H', 0), - ('op', 'H', DNS_RD), # recursive query - # XXX - lists of query, RR objects - ('qd', 'H', []), - ('an', 'H', []), - ('ns', 'H', []), - ('ar', 'H', []) - ) - - @property - def qr(self): - return int((self.op & DNS_QR) == DNS_QR) - - @qr.setter - def qr(self, v): - if v: - self.op |= DNS_QR - else: - self.op &= ~DNS_QR - - @property - def opcode(self): - return (self.op >> 11) & 0xf - - @opcode.setter - def opcode(self, v): - self.op = (self.op & ~0x7800) | ((v & 0xf) << 11) - - @property - def aa(self): - return int((self.op & DNS_AA) == DNS_AA) - - @aa.setter - def aa(self, v): - if v: - self.op |= DNS_AA - else: - self.op &= ~DNS_AA - - @property - def rd(self): - return int((self.op & DNS_RD) == DNS_RD) - - @rd.setter - def rd(self, v): - if v: - self.op |= DNS_RD - else: - self.op &= ~DNS_RD - - @property - def ra(self): - return int((self.op & DNS_RA) == DNS_RA) - - @ra.setter - def ra(self, v): - if v: - self.op |= DNS_RA - else: - self.op &= ~DNS_RA - - @property - def zero(self): - return int((self.op & DNS_Z) == DNS_Z) - - @zero.setter - def zero(self, v): - if v: - self.op |= DNS_Z - else: - self.op &= ~DNS_Z - - @property - def rcode(self): - return self.op & 0xf - - @rcode.setter - def rcode(self, v): - self.op = (self.op & ~0xf) | (v & 0xf) - - # Deprecated methods, will be removed in the future - # ====================================================== - @deprecated - def get_qr(self): - return self.qr - - @deprecated - def set_qr(self, v): - self.qr = v - - @deprecated - def get_opcode(self): - return self.opcode - - @deprecated - def set_opcode(self, v): - self.opcode = v - - @deprecated - def get_aa(self): - return self.aa - - @deprecated - def set_aa(self, v): - self.aa = v - - @deprecated - def get_rd(self): - return self.rd - - @deprecated - def set_rd(self, v): - self.rd = v - - @deprecated - def get_ra(self): - return self.ra - - @deprecated - def set_ra(self, v): - self.ra = v - - @deprecated - def get_zero(self): - return self.zero - - @deprecated - def set_zero(self, v): - self.zero = v - - @deprecated - def get_rcode(self): - return self.rcode - - @deprecated - def set_rcode(self, v): - self.rcode = v - - # ====================================================== - - class Q(dpkt.Packet): - """DNS question.""" - __hdr__ = ( - ('name', '1025s', ''), - ('type', 'H', DNS_A), - ('cls', 'H', DNS_IN) - ) - - # XXX - suk - def __len__(self): - raise NotImplementedError - - __str__ = __len__ - - def unpack(self, buf): - raise NotImplementedError - - class RR(Q): - """DNS resource record.""" - __hdr__ = ( - ('name', '1025s', ''), - ('type', 'H', DNS_A), - ('cls', 'H', DNS_IN), - ('ttl', 'I', 0), - ('rlen', 'H', 4), - ('rdata', 's', '') - ) - - def pack_rdata(self, off, label_ptrs): - # XXX - yeah, this sux - if self.rdata: - return self.rdata - if self.type == DNS_A: - return self.ip - elif self.type == DNS_NS: - return pack_name(self.nsname, off, label_ptrs) - elif self.type == DNS_CNAME: - return pack_name(self.cname, off, label_ptrs) - elif self.type == DNS_PTR: - return pack_name(self.ptrname, off, label_ptrs) - elif self.type == DNS_SOA: - l = [] - l.append(pack_name(self.mname, off, label_ptrs)) - l.append(pack_name(self.rname, off + len(l[0]), label_ptrs)) - l.append(struct.pack('>IIIII', self.serial, self.refresh, - self.retry, self.expire, self.minimum)) - return ''.join(l) - elif self.type == DNS_MX: - return struct.pack('>H', self.preference) + \ - pack_name(self.mxname, off + 2, label_ptrs) - elif self.type == DNS_TXT or self.type == DNS_HINFO: - return ''.join(['%s%s' % (chr(len(x)), x) - for x in self.text]) - elif self.type == DNS_AAAA: - return self.ip6 - elif self.type == DNS_SRV: - return struct.pack('>HHH', self.priority, self.weight, self.port) + \ - pack_name(self.srvname, off + 6, label_ptrs) - - def unpack_rdata(self, buf, off): - if self.type == DNS_A: - self.ip = self.rdata - elif self.type == DNS_NS: - self.nsname, off = unpack_name(buf, off) - elif self.type == DNS_CNAME: - self.cname, off = unpack_name(buf, off) - elif self.type == DNS_PTR: - self.ptrname, off = unpack_name(buf, off) - elif self.type == DNS_SOA: - self.mname, off = unpack_name(buf, off) - self.rname, off = unpack_name(buf, off) - self.serial, self.refresh, self.retry, self.expire, \ - self.minimum = struct.unpack('>IIIII', buf[off:off + 20]) - elif self.type == DNS_MX: - self.preference = struct.unpack('>H', self.rdata[:2]) - self.mxname, off = unpack_name(buf, off + 2) - elif self.type == DNS_TXT or self.type == DNS_HINFO: - self.text = [] - buf = self.rdata - while buf: - n = ord(buf[0]) - self.text.append(buf[1:1 + n]) - buf = buf[1 + n:] - elif self.type == DNS_AAAA: - self.ip6 = self.rdata - elif self.type == DNS_SRV: - self.priority, self.weight, self.port = struct.unpack('>HHH', self.rdata[:6]) - self.srvname, off = unpack_name(buf, off + 6) - - def pack_q(self, buf, q): - """Append packed DNS question and return buf.""" - return buf + pack_name(q.name, len(buf), self.label_ptrs) + struct.pack('>HH', q.type, q.cls) - - def unpack_q(self, buf, off): - """Return DNS question and new offset.""" - q = self.Q() - q.name, off = unpack_name(buf, off) - q.type, q.cls = struct.unpack('>HH', buf[off:off + 4]) - off += 4 - return q, off - - def pack_rr(self, buf, rr): - """Append packed DNS RR and return buf.""" - name = pack_name(rr.name, len(buf), self.label_ptrs) - rdata = rr.pack_rdata(len(buf) + len(name) + 10, self.label_ptrs) - return buf + name + struct.pack('>HHIH', rr.type, rr.cls, rr.ttl, len(rdata)) + rdata - - def unpack_rr(self, buf, off): - """Return DNS RR and new offset.""" - rr = self.RR() - rr.name, off = unpack_name(buf, off) - rr.type, rr.cls, rr.ttl, rdlen = struct.unpack('>HHIH', buf[off:off + 10]) - off += 10 - rr.rdata = buf[off:off + rdlen] - rr.unpack_rdata(buf, off) - off += rdlen - return rr, off - - def unpack(self, buf): - dpkt.Packet.unpack(self, buf) - off = self.__hdr_len__ - cnt = self.qd # FIXME: This relies on this being properly set somewhere else - self.qd = [] - for _ in range(cnt): - q, off = self.unpack_q(buf, off) - self.qd.append(q) - for x in ('an', 'ns', 'ar'): - cnt = getattr(self, x, 0) - setattr(self, x, []) - for _ in range(cnt): - rr, off = self.unpack_rr(buf, off) - getattr(self, x).append(rr) - self.data = '' - - def __len__(self): - # XXX - cop out - return len(str(self)) - - def __str__(self): - # XXX - compress names on the fly - self.label_ptrs = {} - buf = struct.pack(self.__hdr_fmt__, self.id, self.op, len(self.qd), - len(self.an), len(self.ns), len(self.ar)) - for q in self.qd: - buf = self.pack_q(buf, q) - for x in ('an', 'ns', 'ar'): - for rr in getattr(self, x): - buf = self.pack_rr(buf, rr) - del self.label_ptrs - return buf - - -def test_basic(): - from ip import IP - - s = 'E\x00\x02\x08\xc15\x00\x00\x80\x11\x92aBk0\x01Bk0w\x005\xc07\x01\xf4\xda\xc2d\xd2\x81\x80\x00\x01\x00\x03\x00\x0b\x00\x0b\x03www\x06google\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x03V\x00\x17\x03www\x06google\x06akadns\x03net\x00\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabh\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabc\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0c\x04usw5\x04akam\xc0>\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw6\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw7\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x08\x05asia3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02za\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0f\x02zc\x06akadns\x03org\x00\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zf\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zh\xc0\xd5\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04eur3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use2\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use4\xc0t\xc0\xc1\x00\x01\x00\x01\x00\x00\xfb4\x00\x04\xd0\xb9\x84\xb0\xc0\xd2\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xf1\xc76\xc0\xed\x00\x01\x00\x01\x00\x00\xfb4\x00\x04?\xd7\xc6S\xc0\xfe\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xd00.\xc1\x0f\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xc1-\x01g\xc1"\x00\x01\x00\x01\x00\x00\x101\x00\x04?\xd1\xaa\x88\xc15\x00\x01\x00\x01\x00\x00\r\x1a\x00\x04PCC\xb6\xc0o\x00\x01\x00\x01\x00\x00\x10\x7f\x00\x04?\xf1I\xd6\xc0\x87\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xce\x84dl\xc0\x9a\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04A\xcb\xea\x1b\xc0\xad\x00\x01\x00\x01\x00\x00\x0b)\x00\x04\xc1l\x9a\t' - ip = IP(s) - my_dns = DNS(ip.udp.data) - assert my_dns.qd[0].name == 'www.google.com' and my_dns.an[1].name == 'www.google.akadns.net' - s = '\x05\xf5\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x03cnn\x03com\x00\x00\x01\x00\x01' - my_dns = DNS(s) - assert s == str(my_dns) - - -def test_PTR(): - s = 'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O' - my_dns = DNS(s) - assert my_dns.qd[0].name == '1.1.211.141.in-addr.arpa' and \ - my_dns.an[0].ptrname == 'default.v-umce-ifs.umnet.umich.edu' and \ - my_dns.ns[0].nsname == 'shabby.ifs.umich.edu' and \ - my_dns.ns[1].ttl == 3382L and \ - my_dns.ns[2].nsname == 'dns2.itd.umich.edu' - assert s == str(my_dns) - - -def test_pack_name(): - # Empty name is \0 - x = pack_name('', 0, {}) - assert x == '\0' - - -def test_deprecated_methods(): - """Test deprecated methods. Note: when they are removed so should this test""" - s = 'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O' - my_dns = DNS(s) - my_dns.get_aa() - my_dns.get_opcode() - qr = my_dns.get_qr() - my_dns.set_qr(qr) - qr2 = my_dns.get_qr() - assert qr == qr2 - - -def test_deprecated_method_performance(): - """Test the performance hit for the deprecation decorator""" - from timeit import Timer - - s = 'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O' - my_dns = DNS(s) - t1 = Timer(lambda: my_dns.aa).timeit(10000) - t2 = Timer(my_dns.get_aa).timeit(10000) - print 'Performance of dns.aa vs. dns.get_aa(): %f %f' % (t1, t2) - - -if __name__ == '__main__': - # Runs all the test associated with this class/file - test_basic() - test_PTR() - test_pack_name() - test_deprecated_methods() - test_deprecated_method_performance() - print 'Tests Successful...' |