summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py')
-rw-r--r--scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py560
1 files changed, 0 insertions, 560 deletions
diff --git a/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py b/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py
deleted file mode 100644
index d741a99e..00000000
--- a/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py
+++ /dev/null
@@ -1,560 +0,0 @@
-# $Id: ssl.py 90 2014-04-02 22:06:23Z andrewflnr@gmail.com $
-# Portion Copyright 2012 Google Inc. All rights reserved.
-
-"""Secure Sockets Layer / Transport Layer Security."""
-
-import dpkt
-import ssl_ciphersuites
-import struct
-import binascii
-import traceback
-import datetime
-
-#
-# Note from April 2011: cde...@gmail.com added code that parses SSL3/TLS messages more in depth.
-#
-# Jul 2012: afleenor@google.com modified and extended SSL support further.
-#
-
-
-class SSL2(dpkt.Packet):
- __hdr__ = (
- ('len', 'H', 0),
- ('msg', 's', ''),
- ('pad', 's', ''),
- )
- def unpack(self, buf):
- dpkt.Packet.unpack(self, buf)
- if self.len & 0x8000:
- n = self.len = self.len & 0x7FFF
- self.msg, self.data = self.data[:n], self.data[n:]
- else:
- n = self.len = self.len & 0x3FFF
- padlen = ord(self.data[0])
- self.msg = self.data[1:1+n]
- self.pad = self.data[1+n:1+n+padlen]
- self.data = self.data[1+n+padlen:]
-
-
-# SSLv3/TLS versions
-SSL3_V = 0x0300
-TLS1_V = 0x0301
-TLS11_V = 0x0302
-TLS12_V = 0x0303
-
-ssl3_versions_str = {
- SSL3_V: 'SSL3',
- TLS1_V: 'TLS 1.0',
- TLS11_V: 'TLS 1.1',
- TLS12_V: 'TLS 1.2'
-}
-
-SSL3_VERSION_BYTES = set(('\x03\x00', '\x03\x01', '\x03\x02', '\x03\x03'))
-
-
-# Alert levels
-SSL3_AD_WARNING = 1
-SSL3_AD_FATAL = 2
-alert_level_str = {
- SSL3_AD_WARNING: 'SSL3_AD_WARNING',
- SSL3_AD_FATAL: 'SSL3_AD_FATAL'
-}
-
-# SSL3 alert descriptions
-SSL3_AD_CLOSE_NOTIFY = 0
-SSL3_AD_UNEXPECTED_MESSAGE = 10 # fatal
-SSL3_AD_BAD_RECORD_MAC = 20 # fatal
-SSL3_AD_DECOMPRESSION_FAILURE = 30 # fatal
-SSL3_AD_HANDSHAKE_FAILURE = 40 # fatal
-SSL3_AD_NO_CERTIFICATE = 41
-SSL3_AD_BAD_CERTIFICATE = 42
-SSL3_AD_UNSUPPORTED_CERTIFICATE = 43
-SSL3_AD_CERTIFICATE_REVOKED = 44
-SSL3_AD_CERTIFICATE_EXPIRED = 45
-SSL3_AD_CERTIFICATE_UNKNOWN = 46
-SSL3_AD_ILLEGAL_PARAMETER = 47 # fatal
-
-# TLS1 alert descriptions
-TLS1_AD_DECRYPTION_FAILED = 21
-TLS1_AD_RECORD_OVERFLOW = 22
-TLS1_AD_UNKNOWN_CA = 48 # fatal
-TLS1_AD_ACCESS_DENIED = 49 # fatal
-TLS1_AD_DECODE_ERROR = 50 # fatal
-TLS1_AD_DECRYPT_ERROR = 51
-TLS1_AD_EXPORT_RESTRICTION = 60 # fatal
-TLS1_AD_PROTOCOL_VERSION = 70 # fatal
-TLS1_AD_INSUFFICIENT_SECURITY = 71 # fatal
-TLS1_AD_INTERNAL_ERROR = 80 # fatal
-TLS1_AD_USER_CANCELLED = 90
-TLS1_AD_NO_RENEGOTIATION = 100
-#/* codes 110-114 are from RFC3546 */
-TLS1_AD_UNSUPPORTED_EXTENSION = 110
-TLS1_AD_CERTIFICATE_UNOBTAINABLE = 111
-TLS1_AD_UNRECOGNIZED_NAME = 112
-TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE = 113
-TLS1_AD_BAD_CERTIFICATE_HASH_VALUE = 114
-TLS1_AD_UNKNOWN_PSK_IDENTITY = 115 # fatal
-
-
-# Mapping alert types to strings
-alert_description_str = {
- SSL3_AD_CLOSE_NOTIFY: 'SSL3_AD_CLOSE_NOTIFY',
- SSL3_AD_UNEXPECTED_MESSAGE: 'SSL3_AD_UNEXPECTED_MESSAGE',
- SSL3_AD_BAD_RECORD_MAC: 'SSL3_AD_BAD_RECORD_MAC',
- SSL3_AD_DECOMPRESSION_FAILURE: 'SSL3_AD_DECOMPRESSION_FAILURE',
- SSL3_AD_HANDSHAKE_FAILURE: 'SSL3_AD_HANDSHAKE_FAILURE',
- SSL3_AD_NO_CERTIFICATE: 'SSL3_AD_NO_CERTIFICATE',
- SSL3_AD_BAD_CERTIFICATE: 'SSL3_AD_BAD_CERTIFICATE',
- SSL3_AD_UNSUPPORTED_CERTIFICATE: 'SSL3_AD_UNSUPPORTED_CERTIFICATE',
- SSL3_AD_CERTIFICATE_REVOKED: 'SSL3_AD_CERTIFICATE_REVOKED',
- SSL3_AD_CERTIFICATE_EXPIRED: 'SSL3_AD_CERTIFICATE_EXPIRED',
- SSL3_AD_CERTIFICATE_UNKNOWN: 'SSL3_AD_CERTIFICATE_UNKNOWN',
- SSL3_AD_ILLEGAL_PARAMETER: 'SSL3_AD_ILLEGAL_PARAMETER',
- TLS1_AD_DECRYPTION_FAILED: 'TLS1_AD_DECRYPTION_FAILED',
- TLS1_AD_RECORD_OVERFLOW: 'TLS1_AD_RECORD_OVERFLOW',
- TLS1_AD_UNKNOWN_CA: 'TLS1_AD_UNKNOWN_CA',
- TLS1_AD_ACCESS_DENIED: 'TLS1_AD_ACCESS_DENIED',
- TLS1_AD_DECODE_ERROR: 'TLS1_AD_DECODE_ERROR',
- TLS1_AD_DECRYPT_ERROR: 'TLS1_AD_DECRYPT_ERROR',
- TLS1_AD_EXPORT_RESTRICTION: 'TLS1_AD_EXPORT_RESTRICTION',
- TLS1_AD_PROTOCOL_VERSION: 'TLS1_AD_PROTOCOL_VERSION',
- TLS1_AD_INSUFFICIENT_SECURITY: 'TLS1_AD_INSUFFICIENT_SECURITY',
- TLS1_AD_INTERNAL_ERROR: 'TLS1_AD_INTERNAL_ERROR',
- TLS1_AD_USER_CANCELLED: 'TLS1_AD_USER_CANCELLED',
- TLS1_AD_NO_RENEGOTIATION: 'TLS1_AD_NO_RENEGOTIATION',
- TLS1_AD_UNSUPPORTED_EXTENSION: 'TLS1_AD_UNSUPPORTED_EXTENSION',
- TLS1_AD_CERTIFICATE_UNOBTAINABLE: 'TLS1_AD_CERTIFICATE_UNOBTAINABLE',
- TLS1_AD_UNRECOGNIZED_NAME: 'TLS1_AD_UNRECOGNIZED_NAME',
- TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE: 'TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE',
- TLS1_AD_BAD_CERTIFICATE_HASH_VALUE: 'TLS1_AD_BAD_CERTIFICATE_HASH_VALUE',
- TLS1_AD_UNKNOWN_PSK_IDENTITY: 'TLS1_AD_UNKNOWN_PSK_IDENTITY'
-}
-
-
-# struct format strings for parsing buffer lengths
-# don't forget, you have to pad a 3-byte value with \x00
-_SIZE_FORMATS = ['!B', '!H', '!I', '!I']
-
-def parse_variable_array(buf, lenbytes):
- """
- Parse an array described using the 'Type name<x..y>' syntax from the spec
-
- Read a length at the start of buf, and returns that many bytes
- after, in a tuple with the TOTAL bytes consumed (including the size). This
- does not check that the array is the right length for any given datatype.
- """
- # first have to figure out how to parse length
- assert lenbytes <= 4 # pretty sure 4 is impossible, too
- size_format = _SIZE_FORMATS[lenbytes - 1]
- padding = '\x00' if lenbytes == 3 else ''
- # read off the length
- size = struct.unpack(size_format, padding + buf[:lenbytes])[0]
- # read the actual data
- data = buf[lenbytes:lenbytes + size]
- # if len(data) != size: insufficient data
- return data, size + lenbytes
-
-
-class SSL3Exception(Exception):
- pass
-
-
-class TLSRecord(dpkt.Packet):
- """
- SSLv3 or TLSv1+ packet.
-
- In addition to the fields specified in the header, there are
- compressed and decrypted fields, indicating whether, in the language
- of the spec, this is a TLSPlaintext, TLSCompressed, or
- TLSCiphertext. The application will have to figure out when it's
- appropriate to change these values.
- """
-
- __hdr__ = (
- ('type', 'B', 0),
- ('version', 'H', 0),
- ('length', 'H', 0),
- )
-
- def __init__(self, *args, **kwargs):
- # assume plaintext unless specified otherwise in arguments
- self.compressed = kwargs.pop('compressed', False)
- self.encrypted = kwargs.pop('encrypted', False)
- # parent constructor
- dpkt.Packet.__init__(self, *args, **kwargs)
- # make sure length and data are consistent
- self.length = len(self.data)
-
- def unpack(self, buf):
- dpkt.Packet.unpack(self, buf)
- header_length = self.__hdr_len__
- self.data = buf[header_length:header_length+self.length]
- # make sure buffer was long enough
- if len(self.data) != self.length:
- raise dpkt.NeedData('TLSRecord data was too short.')
- # assume compressed and encrypted when it's been parsed from
- # raw data
- self.compressed = True
- self.encrypted = True
-
-
-class TLSChangeCipherSpec(dpkt.Packet):
- """
- ChangeCipherSpec message is just a single byte with value 1
- """
- __hdr__ = (('type', 'B', 1),)
-
-
-class TLSAppData(str):
- """
- As far as TLSRecord is concerned, AppData is just an opaque blob.
- """
- pass
-
-
-class TLSAlert(dpkt.Packet):
-
- __hdr__ = (
- ('level', 'B', 1),
- ('description', 'B', 0),
- )
-
-
-class TLSHelloRequest(dpkt.Packet):
- __hdr__ = tuple()
-
-
-class TLSClientHello(dpkt.Packet):
- __hdr__ = (
- ('version', 'H', 0x0301),
- ('random', '32s', '\x00'*32),
- ) # the rest is variable-length and has to be done manually
-
- def unpack(self, buf):
- dpkt.Packet.unpack(self, buf)
- # now session, cipher suites, extensions are in self.data
- self.session_id, pointer = parse_variable_array(self.data, 1)
-# print 'pointer',pointer
- # handle ciphersuites
- ciphersuites, parsed = parse_variable_array(self.data[pointer:], 2)
- pointer += parsed
- self.num_ciphersuites = len(ciphersuites) / 2
- # check len(ciphersuites) % 2 == 0 ?
- # compression methods
- compression_methods, parsed = parse_variable_array(
- self.data[pointer:], 1)
- pointer += parsed
- self.num_compression_methods = parsed - 1
- self.compression_methods = map(ord, compression_methods)
- # extensions
-
-
-class TLSServerHello(dpkt.Packet):
- __hdr__ = (
- ('version', 'H', '0x0301'),
- ('random', '32s', '\x00'*32),
- ) # session is variable, forcing rest to be manual
-
- def unpack(self, buf):
- try:
- dpkt.Packet.unpack(self, buf)
- self.session_id, pointer = parse_variable_array(self.data, 1)
- # single cipher suite
- self.cipher_suite = struct.unpack('!H', self.data[pointer:pointer+2])[0]
- pointer += 2
- # single compression method
- self.compression = struct.unpack('!B', self.data[pointer:pointer+1])[0]
- pointer += 1
- # ignore extensions for now
- except struct.error:
- # probably data too short
- raise dpkt.NeedData
-
-
-class TLSUnknownHandshake(dpkt.Packet):
- __hdr__ = tuple()
-
-TLSCertificate = TLSUnknownHandshake
-TLSServerKeyExchange = TLSUnknownHandshake
-TLSCertificateRequest = TLSUnknownHandshake
-TLSServerHelloDone = TLSUnknownHandshake
-TLSCertificateVerify = TLSUnknownHandshake
-TLSClientKeyExchange = TLSUnknownHandshake
-TLSFinished = TLSUnknownHandshake
-
-
-# mapping of handshake type ids to their names
-# and the classes that implement them
-HANDSHAKE_TYPES = {
- 0: ('HelloRequest', TLSHelloRequest),
- 1: ('ClientHello', TLSClientHello),
- 2: ('ServerHello', TLSServerHello),
- 11: ('Certificate', TLSCertificate),
- 12: ('ServerKeyExchange', TLSServerKeyExchange),
- 13: ('CertificateRequest', TLSCertificateRequest),
- 14: ('ServerHelloDone', TLSServerHelloDone),
- 15: ('CertificateVerify', TLSCertificateVerify),
- 16: ('ClientKeyExchange', TLSClientKeyExchange),
- 20: ('Finished', TLSFinished),
-}
-
-
-class TLSHandshake(dpkt.Packet):
- '''
- A TLS Handshake message
-
- This goes for all messages encapsulated in the Record layer, but especially
- important for handshakes and app data: A message may be spread across a
- number of TLSRecords, in addition to the possibility of there being more
- than one in a given Record. You have to put together the contents of
- TLSRecord's yourself.
- '''
-
- # struct.unpack can't handle the 3-byte int, so we parse it as bytes
- # (and store it as bytes so dpkt doesn't get confused), and turn it into
- # an int in a user-facing property
- __hdr__ = (
- ('type', 'B', 0),
- ('length_bytes', '3s', 0),
- )
-
- def unpack(self, buf):
- dpkt.Packet.unpack(self, buf)
- # Wait, might there be more than one message of self.type?
- embedded_type = HANDSHAKE_TYPES.get(self.type, None)
- if embedded_type is None:
- raise SSL3Exception('Unknown or invalid handshake type %d' %
- self.type)
- # only take the right number of bytes
- self.data = self.data[:self.length]
- if len(self.data) != self.length:
- raise dpkt.NeedData
- # get class out of embedded_type tuple
- self.data = embedded_type[1](self.data)
-
- @property
- def length(self):
- return struct.unpack('!I', '\x00' + self.length_bytes)[0]
-
-
-RECORD_TYPES = {
- 20: TLSChangeCipherSpec,
- 21: TLSAlert,
- 22: TLSHandshake,
- 23: TLSAppData,
-}
-
-
-class SSLFactory(object):
- def __new__(cls, buf):
- v = buf[1:3]
- if v in [ '\x03\x00', '\x03\x01', '\x03\x02' ]:
- return SSL3(buf)
- # SSL2 has no characteristic header or magic bytes, so we just assume
- # that the msg is an SSL2 msg if it is not detected as SSL3+
- return SSL2(buf)
-
-
-def TLSMultiFactory(buf):
- '''
- Attempt to parse one or more TLSRecord's out of buf
-
- Args:
- buf: string containing SSL/TLS messages. May have an incomplete record
- on the end
-
- Returns:
- [TLSRecord]
- int, total bytes consumed, != len(buf) if an incomplete record was left at
- the end.
-
- Raises SSL3Exception.
- '''
- i, n = 0, len(buf)
- msgs = []
- while i < n:
- v = buf[i+1:i+3]
- if v in SSL3_VERSION_BYTES:
- try:
- msg = TLSRecord(buf[i:])
- msgs.append(msg)
- except dpkt.NeedData:
- break
- else:
- raise SSL3Exception('Bad TLS version in buf: %r' % buf[i:i+5])
- i += len(msg)
- return msgs, i
-
-
-import unittest
-
-
-_hexdecode = binascii.a2b_hex
-
-
-class TLSRecordTest(unittest.TestCase):
- """
- Test basic TLSRecord functionality
-
- For this test, the contents of the record doesn't matter, since we're not
- parsing the next layer.
- """
- def setUp(self):
- # add some extra data, to make sure length is parsed correctly
- self.p = TLSRecord('\x17\x03\x01\x00\x08abcdefghzzzzzzzzzzz')
- def testContentType(self):
- self.assertEqual(self.p.type, 23)
- def testVersion(self):
- self.assertEqual(self.p.version, 0x0301)
- def testLength(self):
- self.assertEqual(self.p.length, 8)
- def testData(self):
- self.assertEqual(self.p.data, 'abcdefgh')
- def testInitialFlags(self):
- self.assertTrue(self.p.compressed)
- self.assertTrue(self.p.encrypted)
- def testRepack(self):
- p2 = TLSRecord(type=23, version=0x0301, data='abcdefgh')
- self.assertEqual(p2.type, 23)
- self.assertEqual(p2.version, 0x0301)
- self.assertEqual(p2.length, 8)
- self.assertEqual(p2.data, 'abcdefgh')
- self.assertEqual(p2.pack(), self.p.pack())
- def testTotalLength(self):
- # that len(p) includes header
- self.assertEqual(len(self.p), 13)
- def testRaisesNeedDataWhenBufIsShort(self):
- self.assertRaises(
- dpkt.NeedData,
- TLSRecord,
- '\x16\x03\x01\x00\x10abc')
-
-
-class TLSChangeCipherSpecTest(unittest.TestCase):
- "It's just a byte. This will be quick, I promise"
- def setUp(self):
- self.p = TLSChangeCipherSpec('\x01')
- def testParses(self):
- self.assertEqual(self.p.type, 1)
- def testTotalLength(self):
- self.assertEqual(len(self.p), 1)
-
-
-class TLSAppDataTest(unittest.TestCase):
- "AppData is basically just a string"
- def testValue(self):
- d = TLSAppData('abcdefgh')
- self.assertEqual(d, 'abcdefgh')
-
-
-class TLSHandshakeTest(unittest.TestCase):
- def setUp(self):
- self.h = TLSHandshake('\x00\x00\x00\x01\xff')
- def testCreatedInsideMessage(self):
- self.assertTrue(isinstance(self.h.data, TLSHelloRequest))
- def testLength(self):
- self.assertEqual(self.h.length, 0x01)
- def testRaisesNeedData(self):
- self.assertRaises(dpkt.NeedData, TLSHandshake, '\x00\x00\x01\x01')
-
-
-class ClientHelloTest(unittest.TestCase):
- 'This data is extracted from and verified by Wireshark'
-
- def setUp(self):
- self.data = _hexdecode(
- "01000199" # handshake header
- "0301" # version
- "5008220ce5e0e78b6891afe204498c9363feffbe03235a2d9e05b7d990eb708d" # rand
- "2009bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed1" # session id
- # cipher suites
- "005400ffc00ac0140088008700390038c00fc00500840035c007c009c011c0130045004400330032c00cc00ec002c0040096004100050004002fc008c01200160013c00dc003feff000ac006c010c00bc00100020001"
- "0100" # compresssion methods
- # extensions
- "00fc0000000e000c0000096c6f63616c686f7374000a00080006001700180019000b00020100002300d0a50b2e9f618a9ea9bf493ef49b421835cd2f6b05bbe1179d8edf70d58c33d656e8696d36d7e7e0b9d3ecc0e4de339552fa06c64c0fcb550a334bc43944e2739ca342d15a9ebbe981ac87a0d38160507d47af09bdc16c5f0ee4cdceea551539382333226048a026d3a90a0535f4a64236467db8fee22b041af986ad0f253bc369137cd8d8cd061925461d7f4d7895ca9a4181ab554dad50360ac31860e971483877c9335ac1300c5e78f3e56f3b8e0fc16358fcaceefd5c8d8aaae7b35be116f8832856ca61144fcdd95e071b94d0cf7233740000"
- "FFFFFFFFFFFFFFFF") # random garbage
- self.p = TLSHandshake(self.data)
-
- def testClientHelloConstructed(self):
- 'Make sure the correct class was constructed'
- #print self.p
- self.assertTrue(isinstance(self.p.data, TLSClientHello))
-
-# def testClientDateCorrect(self):
-# self.assertEqual(self.p.random_unixtime, 1342710284)
-
- def testClientRandomCorrect(self):
- self.assertEqual(self.p.data.random,
- _hexdecode('5008220ce5e0e78b6891afe204498c9363feffbe03235a2d9e05b7d990eb708d'))
-
- def testCipherSuiteLength(self):
- # we won't bother testing the identity of each cipher suite in the list.
- self.assertEqual(self.p.data.num_ciphersuites, 42)
- #self.assertEqual(len(self.p.ciphersuites), 42)
-
- def testSessionId(self):
- self.assertEqual(self.p.data.session_id,
- _hexdecode('09bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed1'))
-
- def testCompressionMethods(self):
- self.assertEqual(self.p.data.num_compression_methods, 1)
-
- def testTotalLength(self):
- self.assertEqual(len(self.p), 413)
-
-
-class ServerHelloTest(unittest.TestCase):
- 'Again, from Wireshark'
-
- def setUp(self):
- self.data = _hexdecode('0200004d03015008220c8ec43c5462315a7c99f5d5b6bff009ad285b51dc18485f352e9fdecd2009bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed10002000005ff01000100')
- self.p = TLSHandshake(self.data)
-
- def testConstructed(self):
- self.assertTrue(isinstance(self.p.data, TLSServerHello))
-
-# def testDateCorrect(self):
-# self.assertEqual(self.p.random_unixtime, 1342710284)
-
- def testRandomCorrect(self):
- self.assertEqual(self.p.data.random,
- _hexdecode('5008220c8ec43c5462315a7c99f5d5b6bff009ad285b51dc18485f352e9fdecd'))
-
- def testCipherSuite(self):
- self.assertEqual(
- ssl_ciphersuites.BY_CODE[self.p.data.cipher_suite].name,
- 'TLS_RSA_WITH_NULL_SHA')
-
- def testTotalLength(self):
- self.assertEqual(len(self.p), 81)
-
-
-class TLSMultiFactoryTest(unittest.TestCase):
- "Made up test data"
-
- def setUp(self):
- self.data = _hexdecode('1703010010' # header 1
- 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' # data 1
- '1703010010' # header 2
- 'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB' # data 2
- '1703010010' # header 3
- 'CCCCCCCC') # data 3 (incomplete)
- self.msgs, self.bytes_parsed = TLSMultiFactory(self.data)
-
- def testNumMessages(self):
- # only complete messages should be parsed, incomplete ones left
- # in buffer
- self.assertEqual(len(self.msgs), 2)
-
- def testBytesParsed(self):
- self.assertEqual(self.bytes_parsed, (5 + 16) * 2)
-
- def testFirstMsgData(self):
- self.assertEqual(self.msgs[0].data, _hexdecode('AA' * 16))
-
- def testSecondMsgData(self):
- self.assertEqual(self.msgs[1].data, _hexdecode('BB' * 16))
-
-
-if __name__ == '__main__':
- unittest.main()