summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py')
-rw-r--r--scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py560
1 files changed, 560 insertions, 0 deletions
diff --git a/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py b/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py
new file mode 100644
index 00000000..d741a99e
--- /dev/null
+++ b/scripts/external_libs/dpkt-1.8.6/dpkt/ssl.py
@@ -0,0 +1,560 @@
+# $Id: ssl.py 90 2014-04-02 22:06:23Z andrewflnr@gmail.com $
+# Portion Copyright 2012 Google Inc. All rights reserved.
+
+"""Secure Sockets Layer / Transport Layer Security."""
+
+import dpkt
+import ssl_ciphersuites
+import struct
+import binascii
+import traceback
+import datetime
+
+#
+# Note from April 2011: cde...@gmail.com added code that parses SSL3/TLS messages more in depth.
+#
+# Jul 2012: afleenor@google.com modified and extended SSL support further.
+#
+
+
+class SSL2(dpkt.Packet):
+ __hdr__ = (
+ ('len', 'H', 0),
+ ('msg', 's', ''),
+ ('pad', 's', ''),
+ )
+ def unpack(self, buf):
+ dpkt.Packet.unpack(self, buf)
+ if self.len & 0x8000:
+ n = self.len = self.len & 0x7FFF
+ self.msg, self.data = self.data[:n], self.data[n:]
+ else:
+ n = self.len = self.len & 0x3FFF
+ padlen = ord(self.data[0])
+ self.msg = self.data[1:1+n]
+ self.pad = self.data[1+n:1+n+padlen]
+ self.data = self.data[1+n+padlen:]
+
+
+# SSLv3/TLS versions
+SSL3_V = 0x0300
+TLS1_V = 0x0301
+TLS11_V = 0x0302
+TLS12_V = 0x0303
+
+ssl3_versions_str = {
+ SSL3_V: 'SSL3',
+ TLS1_V: 'TLS 1.0',
+ TLS11_V: 'TLS 1.1',
+ TLS12_V: 'TLS 1.2'
+}
+
+SSL3_VERSION_BYTES = set(('\x03\x00', '\x03\x01', '\x03\x02', '\x03\x03'))
+
+
+# Alert levels
+SSL3_AD_WARNING = 1
+SSL3_AD_FATAL = 2
+alert_level_str = {
+ SSL3_AD_WARNING: 'SSL3_AD_WARNING',
+ SSL3_AD_FATAL: 'SSL3_AD_FATAL'
+}
+
+# SSL3 alert descriptions
+SSL3_AD_CLOSE_NOTIFY = 0
+SSL3_AD_UNEXPECTED_MESSAGE = 10 # fatal
+SSL3_AD_BAD_RECORD_MAC = 20 # fatal
+SSL3_AD_DECOMPRESSION_FAILURE = 30 # fatal
+SSL3_AD_HANDSHAKE_FAILURE = 40 # fatal
+SSL3_AD_NO_CERTIFICATE = 41
+SSL3_AD_BAD_CERTIFICATE = 42
+SSL3_AD_UNSUPPORTED_CERTIFICATE = 43
+SSL3_AD_CERTIFICATE_REVOKED = 44
+SSL3_AD_CERTIFICATE_EXPIRED = 45
+SSL3_AD_CERTIFICATE_UNKNOWN = 46
+SSL3_AD_ILLEGAL_PARAMETER = 47 # fatal
+
+# TLS1 alert descriptions
+TLS1_AD_DECRYPTION_FAILED = 21
+TLS1_AD_RECORD_OVERFLOW = 22
+TLS1_AD_UNKNOWN_CA = 48 # fatal
+TLS1_AD_ACCESS_DENIED = 49 # fatal
+TLS1_AD_DECODE_ERROR = 50 # fatal
+TLS1_AD_DECRYPT_ERROR = 51
+TLS1_AD_EXPORT_RESTRICTION = 60 # fatal
+TLS1_AD_PROTOCOL_VERSION = 70 # fatal
+TLS1_AD_INSUFFICIENT_SECURITY = 71 # fatal
+TLS1_AD_INTERNAL_ERROR = 80 # fatal
+TLS1_AD_USER_CANCELLED = 90
+TLS1_AD_NO_RENEGOTIATION = 100
+#/* codes 110-114 are from RFC3546 */
+TLS1_AD_UNSUPPORTED_EXTENSION = 110
+TLS1_AD_CERTIFICATE_UNOBTAINABLE = 111
+TLS1_AD_UNRECOGNIZED_NAME = 112
+TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE = 113
+TLS1_AD_BAD_CERTIFICATE_HASH_VALUE = 114
+TLS1_AD_UNKNOWN_PSK_IDENTITY = 115 # fatal
+
+
+# Mapping alert types to strings
+alert_description_str = {
+ SSL3_AD_CLOSE_NOTIFY: 'SSL3_AD_CLOSE_NOTIFY',
+ SSL3_AD_UNEXPECTED_MESSAGE: 'SSL3_AD_UNEXPECTED_MESSAGE',
+ SSL3_AD_BAD_RECORD_MAC: 'SSL3_AD_BAD_RECORD_MAC',
+ SSL3_AD_DECOMPRESSION_FAILURE: 'SSL3_AD_DECOMPRESSION_FAILURE',
+ SSL3_AD_HANDSHAKE_FAILURE: 'SSL3_AD_HANDSHAKE_FAILURE',
+ SSL3_AD_NO_CERTIFICATE: 'SSL3_AD_NO_CERTIFICATE',
+ SSL3_AD_BAD_CERTIFICATE: 'SSL3_AD_BAD_CERTIFICATE',
+ SSL3_AD_UNSUPPORTED_CERTIFICATE: 'SSL3_AD_UNSUPPORTED_CERTIFICATE',
+ SSL3_AD_CERTIFICATE_REVOKED: 'SSL3_AD_CERTIFICATE_REVOKED',
+ SSL3_AD_CERTIFICATE_EXPIRED: 'SSL3_AD_CERTIFICATE_EXPIRED',
+ SSL3_AD_CERTIFICATE_UNKNOWN: 'SSL3_AD_CERTIFICATE_UNKNOWN',
+ SSL3_AD_ILLEGAL_PARAMETER: 'SSL3_AD_ILLEGAL_PARAMETER',
+ TLS1_AD_DECRYPTION_FAILED: 'TLS1_AD_DECRYPTION_FAILED',
+ TLS1_AD_RECORD_OVERFLOW: 'TLS1_AD_RECORD_OVERFLOW',
+ TLS1_AD_UNKNOWN_CA: 'TLS1_AD_UNKNOWN_CA',
+ TLS1_AD_ACCESS_DENIED: 'TLS1_AD_ACCESS_DENIED',
+ TLS1_AD_DECODE_ERROR: 'TLS1_AD_DECODE_ERROR',
+ TLS1_AD_DECRYPT_ERROR: 'TLS1_AD_DECRYPT_ERROR',
+ TLS1_AD_EXPORT_RESTRICTION: 'TLS1_AD_EXPORT_RESTRICTION',
+ TLS1_AD_PROTOCOL_VERSION: 'TLS1_AD_PROTOCOL_VERSION',
+ TLS1_AD_INSUFFICIENT_SECURITY: 'TLS1_AD_INSUFFICIENT_SECURITY',
+ TLS1_AD_INTERNAL_ERROR: 'TLS1_AD_INTERNAL_ERROR',
+ TLS1_AD_USER_CANCELLED: 'TLS1_AD_USER_CANCELLED',
+ TLS1_AD_NO_RENEGOTIATION: 'TLS1_AD_NO_RENEGOTIATION',
+ TLS1_AD_UNSUPPORTED_EXTENSION: 'TLS1_AD_UNSUPPORTED_EXTENSION',
+ TLS1_AD_CERTIFICATE_UNOBTAINABLE: 'TLS1_AD_CERTIFICATE_UNOBTAINABLE',
+ TLS1_AD_UNRECOGNIZED_NAME: 'TLS1_AD_UNRECOGNIZED_NAME',
+ TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE: 'TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE',
+ TLS1_AD_BAD_CERTIFICATE_HASH_VALUE: 'TLS1_AD_BAD_CERTIFICATE_HASH_VALUE',
+ TLS1_AD_UNKNOWN_PSK_IDENTITY: 'TLS1_AD_UNKNOWN_PSK_IDENTITY'
+}
+
+
+# struct format strings for parsing buffer lengths
+# don't forget, you have to pad a 3-byte value with \x00
+_SIZE_FORMATS = ['!B', '!H', '!I', '!I']
+
+def parse_variable_array(buf, lenbytes):
+ """
+ Parse an array described using the 'Type name<x..y>' syntax from the spec
+
+ Read a length at the start of buf, and returns that many bytes
+ after, in a tuple with the TOTAL bytes consumed (including the size). This
+ does not check that the array is the right length for any given datatype.
+ """
+ # first have to figure out how to parse length
+ assert lenbytes <= 4 # pretty sure 4 is impossible, too
+ size_format = _SIZE_FORMATS[lenbytes - 1]
+ padding = '\x00' if lenbytes == 3 else ''
+ # read off the length
+ size = struct.unpack(size_format, padding + buf[:lenbytes])[0]
+ # read the actual data
+ data = buf[lenbytes:lenbytes + size]
+ # if len(data) != size: insufficient data
+ return data, size + lenbytes
+
+
+class SSL3Exception(Exception):
+ pass
+
+
+class TLSRecord(dpkt.Packet):
+ """
+ SSLv3 or TLSv1+ packet.
+
+ In addition to the fields specified in the header, there are
+ compressed and decrypted fields, indicating whether, in the language
+ of the spec, this is a TLSPlaintext, TLSCompressed, or
+ TLSCiphertext. The application will have to figure out when it's
+ appropriate to change these values.
+ """
+
+ __hdr__ = (
+ ('type', 'B', 0),
+ ('version', 'H', 0),
+ ('length', 'H', 0),
+ )
+
+ def __init__(self, *args, **kwargs):
+ # assume plaintext unless specified otherwise in arguments
+ self.compressed = kwargs.pop('compressed', False)
+ self.encrypted = kwargs.pop('encrypted', False)
+ # parent constructor
+ dpkt.Packet.__init__(self, *args, **kwargs)
+ # make sure length and data are consistent
+ self.length = len(self.data)
+
+ def unpack(self, buf):
+ dpkt.Packet.unpack(self, buf)
+ header_length = self.__hdr_len__
+ self.data = buf[header_length:header_length+self.length]
+ # make sure buffer was long enough
+ if len(self.data) != self.length:
+ raise dpkt.NeedData('TLSRecord data was too short.')
+ # assume compressed and encrypted when it's been parsed from
+ # raw data
+ self.compressed = True
+ self.encrypted = True
+
+
+class TLSChangeCipherSpec(dpkt.Packet):
+ """
+ ChangeCipherSpec message is just a single byte with value 1
+ """
+ __hdr__ = (('type', 'B', 1),)
+
+
+class TLSAppData(str):
+ """
+ As far as TLSRecord is concerned, AppData is just an opaque blob.
+ """
+ pass
+
+
+class TLSAlert(dpkt.Packet):
+
+ __hdr__ = (
+ ('level', 'B', 1),
+ ('description', 'B', 0),
+ )
+
+
+class TLSHelloRequest(dpkt.Packet):
+ __hdr__ = tuple()
+
+
+class TLSClientHello(dpkt.Packet):
+ __hdr__ = (
+ ('version', 'H', 0x0301),
+ ('random', '32s', '\x00'*32),
+ ) # the rest is variable-length and has to be done manually
+
+ def unpack(self, buf):
+ dpkt.Packet.unpack(self, buf)
+ # now session, cipher suites, extensions are in self.data
+ self.session_id, pointer = parse_variable_array(self.data, 1)
+# print 'pointer',pointer
+ # handle ciphersuites
+ ciphersuites, parsed = parse_variable_array(self.data[pointer:], 2)
+ pointer += parsed
+ self.num_ciphersuites = len(ciphersuites) / 2
+ # check len(ciphersuites) % 2 == 0 ?
+ # compression methods
+ compression_methods, parsed = parse_variable_array(
+ self.data[pointer:], 1)
+ pointer += parsed
+ self.num_compression_methods = parsed - 1
+ self.compression_methods = map(ord, compression_methods)
+ # extensions
+
+
+class TLSServerHello(dpkt.Packet):
+ __hdr__ = (
+ ('version', 'H', '0x0301'),
+ ('random', '32s', '\x00'*32),
+ ) # session is variable, forcing rest to be manual
+
+ def unpack(self, buf):
+ try:
+ dpkt.Packet.unpack(self, buf)
+ self.session_id, pointer = parse_variable_array(self.data, 1)
+ # single cipher suite
+ self.cipher_suite = struct.unpack('!H', self.data[pointer:pointer+2])[0]
+ pointer += 2
+ # single compression method
+ self.compression = struct.unpack('!B', self.data[pointer:pointer+1])[0]
+ pointer += 1
+ # ignore extensions for now
+ except struct.error:
+ # probably data too short
+ raise dpkt.NeedData
+
+
+class TLSUnknownHandshake(dpkt.Packet):
+ __hdr__ = tuple()
+
+TLSCertificate = TLSUnknownHandshake
+TLSServerKeyExchange = TLSUnknownHandshake
+TLSCertificateRequest = TLSUnknownHandshake
+TLSServerHelloDone = TLSUnknownHandshake
+TLSCertificateVerify = TLSUnknownHandshake
+TLSClientKeyExchange = TLSUnknownHandshake
+TLSFinished = TLSUnknownHandshake
+
+
+# mapping of handshake type ids to their names
+# and the classes that implement them
+HANDSHAKE_TYPES = {
+ 0: ('HelloRequest', TLSHelloRequest),
+ 1: ('ClientHello', TLSClientHello),
+ 2: ('ServerHello', TLSServerHello),
+ 11: ('Certificate', TLSCertificate),
+ 12: ('ServerKeyExchange', TLSServerKeyExchange),
+ 13: ('CertificateRequest', TLSCertificateRequest),
+ 14: ('ServerHelloDone', TLSServerHelloDone),
+ 15: ('CertificateVerify', TLSCertificateVerify),
+ 16: ('ClientKeyExchange', TLSClientKeyExchange),
+ 20: ('Finished', TLSFinished),
+}
+
+
+class TLSHandshake(dpkt.Packet):
+ '''
+ A TLS Handshake message
+
+ This goes for all messages encapsulated in the Record layer, but especially
+ important for handshakes and app data: A message may be spread across a
+ number of TLSRecords, in addition to the possibility of there being more
+ than one in a given Record. You have to put together the contents of
+ TLSRecord's yourself.
+ '''
+
+ # struct.unpack can't handle the 3-byte int, so we parse it as bytes
+ # (and store it as bytes so dpkt doesn't get confused), and turn it into
+ # an int in a user-facing property
+ __hdr__ = (
+ ('type', 'B', 0),
+ ('length_bytes', '3s', 0),
+ )
+
+ def unpack(self, buf):
+ dpkt.Packet.unpack(self, buf)
+ # Wait, might there be more than one message of self.type?
+ embedded_type = HANDSHAKE_TYPES.get(self.type, None)
+ if embedded_type is None:
+ raise SSL3Exception('Unknown or invalid handshake type %d' %
+ self.type)
+ # only take the right number of bytes
+ self.data = self.data[:self.length]
+ if len(self.data) != self.length:
+ raise dpkt.NeedData
+ # get class out of embedded_type tuple
+ self.data = embedded_type[1](self.data)
+
+ @property
+ def length(self):
+ return struct.unpack('!I', '\x00' + self.length_bytes)[0]
+
+
+RECORD_TYPES = {
+ 20: TLSChangeCipherSpec,
+ 21: TLSAlert,
+ 22: TLSHandshake,
+ 23: TLSAppData,
+}
+
+
+class SSLFactory(object):
+ def __new__(cls, buf):
+ v = buf[1:3]
+ if v in [ '\x03\x00', '\x03\x01', '\x03\x02' ]:
+ return SSL3(buf)
+ # SSL2 has no characteristic header or magic bytes, so we just assume
+ # that the msg is an SSL2 msg if it is not detected as SSL3+
+ return SSL2(buf)
+
+
+def TLSMultiFactory(buf):
+ '''
+ Attempt to parse one or more TLSRecord's out of buf
+
+ Args:
+ buf: string containing SSL/TLS messages. May have an incomplete record
+ on the end
+
+ Returns:
+ [TLSRecord]
+ int, total bytes consumed, != len(buf) if an incomplete record was left at
+ the end.
+
+ Raises SSL3Exception.
+ '''
+ i, n = 0, len(buf)
+ msgs = []
+ while i < n:
+ v = buf[i+1:i+3]
+ if v in SSL3_VERSION_BYTES:
+ try:
+ msg = TLSRecord(buf[i:])
+ msgs.append(msg)
+ except dpkt.NeedData:
+ break
+ else:
+ raise SSL3Exception('Bad TLS version in buf: %r' % buf[i:i+5])
+ i += len(msg)
+ return msgs, i
+
+
+import unittest
+
+
+_hexdecode = binascii.a2b_hex
+
+
+class TLSRecordTest(unittest.TestCase):
+ """
+ Test basic TLSRecord functionality
+
+ For this test, the contents of the record doesn't matter, since we're not
+ parsing the next layer.
+ """
+ def setUp(self):
+ # add some extra data, to make sure length is parsed correctly
+ self.p = TLSRecord('\x17\x03\x01\x00\x08abcdefghzzzzzzzzzzz')
+ def testContentType(self):
+ self.assertEqual(self.p.type, 23)
+ def testVersion(self):
+ self.assertEqual(self.p.version, 0x0301)
+ def testLength(self):
+ self.assertEqual(self.p.length, 8)
+ def testData(self):
+ self.assertEqual(self.p.data, 'abcdefgh')
+ def testInitialFlags(self):
+ self.assertTrue(self.p.compressed)
+ self.assertTrue(self.p.encrypted)
+ def testRepack(self):
+ p2 = TLSRecord(type=23, version=0x0301, data='abcdefgh')
+ self.assertEqual(p2.type, 23)
+ self.assertEqual(p2.version, 0x0301)
+ self.assertEqual(p2.length, 8)
+ self.assertEqual(p2.data, 'abcdefgh')
+ self.assertEqual(p2.pack(), self.p.pack())
+ def testTotalLength(self):
+ # that len(p) includes header
+ self.assertEqual(len(self.p), 13)
+ def testRaisesNeedDataWhenBufIsShort(self):
+ self.assertRaises(
+ dpkt.NeedData,
+ TLSRecord,
+ '\x16\x03\x01\x00\x10abc')
+
+
+class TLSChangeCipherSpecTest(unittest.TestCase):
+ "It's just a byte. This will be quick, I promise"
+ def setUp(self):
+ self.p = TLSChangeCipherSpec('\x01')
+ def testParses(self):
+ self.assertEqual(self.p.type, 1)
+ def testTotalLength(self):
+ self.assertEqual(len(self.p), 1)
+
+
+class TLSAppDataTest(unittest.TestCase):
+ "AppData is basically just a string"
+ def testValue(self):
+ d = TLSAppData('abcdefgh')
+ self.assertEqual(d, 'abcdefgh')
+
+
+class TLSHandshakeTest(unittest.TestCase):
+ def setUp(self):
+ self.h = TLSHandshake('\x00\x00\x00\x01\xff')
+ def testCreatedInsideMessage(self):
+ self.assertTrue(isinstance(self.h.data, TLSHelloRequest))
+ def testLength(self):
+ self.assertEqual(self.h.length, 0x01)
+ def testRaisesNeedData(self):
+ self.assertRaises(dpkt.NeedData, TLSHandshake, '\x00\x00\x01\x01')
+
+
+class ClientHelloTest(unittest.TestCase):
+ 'This data is extracted from and verified by Wireshark'
+
+ def setUp(self):
+ self.data = _hexdecode(
+ "01000199" # handshake header
+ "0301" # version
+ "5008220ce5e0e78b6891afe204498c9363feffbe03235a2d9e05b7d990eb708d" # rand
+ "2009bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed1" # session id
+ # cipher suites
+ "005400ffc00ac0140088008700390038c00fc00500840035c007c009c011c0130045004400330032c00cc00ec002c0040096004100050004002fc008c01200160013c00dc003feff000ac006c010c00bc00100020001"
+ "0100" # compresssion methods
+ # extensions
+ "00fc0000000e000c0000096c6f63616c686f7374000a00080006001700180019000b00020100002300d0a50b2e9f618a9ea9bf493ef49b421835cd2f6b05bbe1179d8edf70d58c33d656e8696d36d7e7e0b9d3ecc0e4de339552fa06c64c0fcb550a334bc43944e2739ca342d15a9ebbe981ac87a0d38160507d47af09bdc16c5f0ee4cdceea551539382333226048a026d3a90a0535f4a64236467db8fee22b041af986ad0f253bc369137cd8d8cd061925461d7f4d7895ca9a4181ab554dad50360ac31860e971483877c9335ac1300c5e78f3e56f3b8e0fc16358fcaceefd5c8d8aaae7b35be116f8832856ca61144fcdd95e071b94d0cf7233740000"
+ "FFFFFFFFFFFFFFFF") # random garbage
+ self.p = TLSHandshake(self.data)
+
+ def testClientHelloConstructed(self):
+ 'Make sure the correct class was constructed'
+ #print self.p
+ self.assertTrue(isinstance(self.p.data, TLSClientHello))
+
+# def testClientDateCorrect(self):
+# self.assertEqual(self.p.random_unixtime, 1342710284)
+
+ def testClientRandomCorrect(self):
+ self.assertEqual(self.p.data.random,
+ _hexdecode('5008220ce5e0e78b6891afe204498c9363feffbe03235a2d9e05b7d990eb708d'))
+
+ def testCipherSuiteLength(self):
+ # we won't bother testing the identity of each cipher suite in the list.
+ self.assertEqual(self.p.data.num_ciphersuites, 42)
+ #self.assertEqual(len(self.p.ciphersuites), 42)
+
+ def testSessionId(self):
+ self.assertEqual(self.p.data.session_id,
+ _hexdecode('09bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed1'))
+
+ def testCompressionMethods(self):
+ self.assertEqual(self.p.data.num_compression_methods, 1)
+
+ def testTotalLength(self):
+ self.assertEqual(len(self.p), 413)
+
+
+class ServerHelloTest(unittest.TestCase):
+ 'Again, from Wireshark'
+
+ def setUp(self):
+ self.data = _hexdecode('0200004d03015008220c8ec43c5462315a7c99f5d5b6bff009ad285b51dc18485f352e9fdecd2009bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed10002000005ff01000100')
+ self.p = TLSHandshake(self.data)
+
+ def testConstructed(self):
+ self.assertTrue(isinstance(self.p.data, TLSServerHello))
+
+# def testDateCorrect(self):
+# self.assertEqual(self.p.random_unixtime, 1342710284)
+
+ def testRandomCorrect(self):
+ self.assertEqual(self.p.data.random,
+ _hexdecode('5008220c8ec43c5462315a7c99f5d5b6bff009ad285b51dc18485f352e9fdecd'))
+
+ def testCipherSuite(self):
+ self.assertEqual(
+ ssl_ciphersuites.BY_CODE[self.p.data.cipher_suite].name,
+ 'TLS_RSA_WITH_NULL_SHA')
+
+ def testTotalLength(self):
+ self.assertEqual(len(self.p), 81)
+
+
+class TLSMultiFactoryTest(unittest.TestCase):
+ "Made up test data"
+
+ def setUp(self):
+ self.data = _hexdecode('1703010010' # header 1
+ 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' # data 1
+ '1703010010' # header 2
+ 'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB' # data 2
+ '1703010010' # header 3
+ 'CCCCCCCC') # data 3 (incomplete)
+ self.msgs, self.bytes_parsed = TLSMultiFactory(self.data)
+
+ def testNumMessages(self):
+ # only complete messages should be parsed, incomplete ones left
+ # in buffer
+ self.assertEqual(len(self.msgs), 2)
+
+ def testBytesParsed(self):
+ self.assertEqual(self.bytes_parsed, (5 + 16) * 2)
+
+ def testFirstMsgData(self):
+ self.assertEqual(self.msgs[0].data, _hexdecode('AA' * 16))
+
+ def testSecondMsgData(self):
+ self.assertEqual(self.msgs[1].data, _hexdecode('BB' * 16))
+
+
+if __name__ == '__main__':
+ unittest.main()