## This file is part of Scapy ## See http://www.secdev.org/projects/scapy for more informations ## Copyright (C) Arnaud Ebalard ## This program is published under a GPLv2 license """ Cryptographic certificates. """ import os, sys, math, socket, struct, hmac, string, time, random, tempfile from subprocess import Popen, PIPE from scapy.utils import strxor try: HAS_HASHLIB=True import hashlib except: HAS_HASHLIB=False from Crypto.PublicKey import * from Crypto.Cipher import * from Crypto.Hash import * from Crypto.Util import number # Maximum allowed size in bytes for a certificate file, to avoid # loading huge file when importing a cert MAX_KEY_SIZE=50*1024 MAX_CERT_SIZE=50*1024 MAX_CRL_SIZE=10*1024*1024 # some are that big ##################################################################### # Some helpers ##################################################################### def popen3(cmd): p = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True) return p.stdout, p.stdin, p.stderr def warning(m): print("WARNING: %s" % m) def randstring(l): """ Returns a random string of length l (l >= 0) """ tmp = map(lambda x: struct.pack("B", random.randrange(0, 256, 1)), [""]*l) return "".join(tmp) def zerofree_randstring(l): """ Returns a random string of length l (l >= 0) without zero in it. """ tmp = map(lambda x: struct.pack("B", random.randrange(1, 256, 1)), [""]*l) return "".join(tmp) def strand(s1, s2): """ Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 must be of same length. """ return "".join(map(lambda x,y:chr(ord(x)&ord(y)), s1, s2)) # OS2IP function defined in RFC 3447 for octet string to integer conversion def pkcs_os2ip(x): """ Accepts a byte string as input parameter and return the associated long value: Input : x octet string to be converted Output: x corresponding nonnegative integer Reverse function is pkcs_i2osp() """ return number.bytes_to_long(x) # IP2OS function defined in RFC 3447 for octet string to integer conversion def pkcs_i2osp(x,xLen): """ Converts a long (the first parameter) to the associated byte string representation of length l (second parameter). Basically, the length parameters allow the function to perform the associated padding. Input : x nonnegative integer to be converted xLen intended length of the resulting octet string Output: x corresponding nonnegative integer Reverse function is pkcs_os2ip(). """ z = number.long_to_bytes(x) padlen = max(0, xLen-len(z)) return '\x00'*padlen + z # for every hash function a tuple is provided, giving access to # - hash output length in byte # - associated hash function that take data to be hashed as parameter # XXX I do not provide update() at the moment. # - DER encoding of the leading bits of digestInfo (the hash value # will be concatenated to create the complete digestInfo). # # Notes: # - MD4 asn.1 value should be verified. Also, as stated in # PKCS#1 v2.1, MD4 should not be used. # - hashlib is available from http://code.krypto.org/python/hashlib/ # - 'tls' one is the concatenation of both md5 and sha1 hashes used # by SSL/TLS when signing/verifying things _hashFuncParams = { "md2" : (16, lambda x: MD2.new(x).digest(), '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x02\x05\x00\x04\x10'), "md4" : (16, lambda x: MD4.new(x).digest(), '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x04\x05\x00\x04\x10'), # is that right ? "md5" : (16, lambda x: MD5.new(x).digest(), '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05\x05\x00\x04\x10'), "sha1" : (20, lambda x: SHA.new(x).digest(), '\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'), "tls" : (36, lambda x: MD5.new(x).digest() + SHA.new(x).digest(), '') } if HAS_HASHLIB: _hashFuncParams["sha224"] = (28, lambda x: hashlib.sha224(x).digest(), '\x30\x2d\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x04\x05\x00\x04\x1c') _hashFuncParams["sha256"] = (32, lambda x: hashlib.sha256(x).digest(), '\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20') _hashFuncParams["sha384"] = (48, lambda x: hashlib.sha384(x).digest(), '\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30') _hashFuncParams["sha512"] = (64, lambda x: hashlib.sha512(x).digest(), '\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40') else: warning("hashlib support is not available. Consider installing it") warning("if you need sha224, sha256, sha384 and sha512 algs.") def pkcs_mgf1(mgfSeed, maskLen, h): """ Implements generic MGF1 Mask Generation function as described in Appendix B.2.1 of RFC 3447. The hash function is passed by name. valid values are 'md2', 'md4', 'md5', 'sha1', 'tls, 'sha256', 'sha384' and 'sha512'. Returns None on error. Input: mgfSeed: seed from which mask is generated, an octet string maskLen: intended length in octets of the mask, at most 2^32 * hLen hLen (see below) h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', 'sha256', 'sha384'). hLen denotes the length in octets of the hash function output. Output: an octet string of length maskLen """ # steps are those of Appendix B.2.1 if not h in _hashFuncParams: warning("pkcs_mgf1: invalid hash (%s) provided") return None hLen = _hashFuncParams[h][0] hFunc = _hashFuncParams[h][1] if maskLen > 2**32 * hLen: # 1) warning("pkcs_mgf1: maskLen > 2**32 * hLen") return None T = "" # 2) maxCounter = math.ceil(float(maskLen) / float(hLen)) # 3) counter = 0 while counter < maxCounter: C = pkcs_i2osp(counter, 4) T += hFunc(mgfSeed + C) counter += 1 return T[:maskLen] def pkcs_emsa_pss_encode(M, emBits, h, mgf, sLen): """ Implements EMSA-PSS-ENCODE() function described in Sect. 9.1.1 of RFC 3447 Input: M : message to be encoded, an octet string emBits: maximal bit length of the integer resulting of pkcs_os2ip(EM), where EM is the encoded message, output of the function. h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', 'sha256', 'sha384'). hLen denotes the length in octets of the hash function output. mgf : the mask generation function f : seed, maskLen -> mask sLen : intended length in octets of the salt Output: encoded message, an octet string of length emLen = ceil(emBits/8) On error, None is returned. """ # 1) is not done hLen = _hashFuncParams[h][0] # 2) hFunc = _hashFuncParams[h][1] mHash = hFunc(M) emLen = int(math.ceil(emBits/8.)) if emLen < hLen + sLen + 2: # 3) warning("encoding error (emLen < hLen + sLen + 2)") return None salt = randstring(sLen) # 4) MPrime = '\x00'*8 + mHash + salt # 5) H = hFunc(MPrime) # 6) PS = '\x00'*(emLen - sLen - hLen - 2) # 7) DB = PS + '\x01' + salt # 8) dbMask = mgf(H, emLen - hLen - 1) # 9) maskedDB = strxor(DB, dbMask) # 10) l = (8*emLen - emBits)/8 # 11) rem = 8*emLen - emBits - 8*l # additionnal bits andMask = l*'\x00' if rem: j = chr(reduce(lambda x,y: x+y, map(lambda x: 1< mask sLen : intended length in octets of the salt Output: True if the verification is ok, False otherwise. """ # 1) is not done hLen = _hashFuncParams[h][0] # 2) hFunc = _hashFuncParams[h][1] mHash = hFunc(M) emLen = int(math.ceil(emBits/8.)) # 3) if emLen < hLen + sLen + 2: return False if EM[-1] != '\xbc': # 4) return False l = emLen - hLen - 1 # 5) maskedDB = EM[:l] H = EM[l:l+hLen] l = (8*emLen - emBits)/8 # 6) rem = 8*emLen - emBits - 8*l # additionnal bits andMask = l*'\xff' if rem: val = reduce(lambda x,y: x+y, map(lambda x: 1< n-1: warning("Key._rsaep() expects a long between 0 and n-1") return None return self.key.encrypt(m, "")[0] def _rsaes_pkcs1_v1_5_encrypt(self, M): """ Implements RSAES-PKCS1-V1_5-ENCRYPT() function described in section 7.2.1 of RFC 3447. Input: M: message to be encrypted, an octet string of length mLen, where mLen <= k - 11 (k denotes the length in octets of the key modulus) Output: ciphertext, an octet string of length k On error, None is returned. """ # 1) Length checking mLen = len(M) k = self.modulusLen / 8 if mLen > k - 11: warning("Key._rsaes_pkcs1_v1_5_encrypt(): message too " "long (%d > %d - 11)" % (mLen, k)) return None # 2) EME-PKCS1-v1_5 encoding PS = zerofree_randstring(k - mLen - 3) # 2.a) EM = '\x00' + '\x02' + PS + '\x00' + M # 2.b) # 3) RSA encryption m = pkcs_os2ip(EM) # 3.a) c = self._rsaep(m) # 3.b) C = pkcs_i2osp(c, k) # 3.c) return C # 4) def _rsaes_oaep_encrypt(self, M, h=None, mgf=None, L=None): """ Internal method providing RSAES-OAEP-ENCRYPT as defined in Sect. 7.1.1 of RFC 3447. Not intended to be used directly. Please, see encrypt() method for type "OAEP". Input: M : message to be encrypted, an octet string of length mLen where mLen <= k - 2*hLen - 2 (k denotes the length in octets of the RSA modulus and hLen the length in octets of the hash function output) h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', 'sha256', 'sha384'). hLen denotes the length in octets of the hash function output. 'sha1' is used by default if not provided. mgf: the mask generation function f : seed, maskLen -> mask L : optional label to be associated with the message; the default value for L, if not provided is the empty string Output: ciphertext, an octet string of length k On error, None is returned. """ # The steps below are the one described in Sect. 7.1.1 of RFC 3447. # 1) Length Checking # 1.a) is not done mLen = len(M) if h is None: h = "sha1" if not h in _hashFuncParams: warning("Key._rsaes_oaep_encrypt(): unknown hash function %s.", h) return None hLen = _hashFuncParams[h][0] hFun = _hashFuncParams[h][1] k = self.modulusLen / 8 if mLen > k - 2*hLen - 2: # 1.b) warning("Key._rsaes_oaep_encrypt(): message too long.") return None # 2) EME-OAEP encoding if L is None: # 2.a) L = "" lHash = hFun(L) PS = '\x00'*(k - mLen - 2*hLen - 2) # 2.b) DB = lHash + PS + '\x01' + M # 2.c) seed = randstring(hLen) # 2.d) if mgf is None: # 2.e) mgf = lambda x,y: pkcs_mgf1(x,y,h) dbMask = mgf(seed, k - hLen - 1) maskedDB = strxor(DB, dbMask) # 2.f) seedMask = mgf(maskedDB, hLen) # 2.g) maskedSeed = strxor(seed, seedMask) # 2.h) EM = '\x00' + maskedSeed + maskedDB # 2.i) # 3) RSA Encryption m = pkcs_os2ip(EM) # 3.a) c = self._rsaep(m) # 3.b) C = pkcs_i2osp(c, k) # 3.c) return C # 4) def encrypt(self, m, t=None, h=None, mgf=None, L=None): """ Encrypt message 'm' using 't' encryption scheme where 't' can be: - None: the message 'm' is directly applied the RSAEP encryption primitive, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect 5.1.1. Simply put, the message undergo a modular exponentiation using the public key. Additionnal method parameters are just ignored. - 'pkcs': the message 'm' is applied RSAES-PKCS1-V1_5-ENCRYPT encryption scheme as described in section 7.2.1 of RFC 3447. In that context, other parameters ('h', 'mgf', 'l') are not used. - 'oaep': the message 'm' is applied the RSAES-OAEP-ENCRYPT encryption scheme, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect 7.1.1. In that context, o 'h' parameter provides the name of the hash method to use. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". if none is provided, sha1 is used. o 'mgf' is the mask generation function. By default, mgf is derived from the provided hash function using the generic MGF1 (see pkcs_mgf1() for details). o 'L' is the optional label to be associated with the message. If not provided, the default value is used, i.e the empty string. No check is done on the input limitation of the hash function regarding the size of 'L' (for instance, 2^61 - 1 for SHA-1). You have been warned. """ if t is None: # Raw encryption m = pkcs_os2ip(m) c = self._rsaep(m) return pkcs_i2osp(c, self.modulusLen/8) elif t == "pkcs": return self._rsaes_pkcs1_v1_5_encrypt(m) elif t == "oaep": return self._rsaes_oaep_encrypt(m, h, mgf, L) else: warning("Key.encrypt(): Unknown encryption type (%s) provided" % t) return None ### Below are verification related methods def _rsavp1(self, s): """ Internal method providing raw RSA verification, i.e. simple modular exponentiation of the given signature representative 'c', an integer between 0 and n-1. This is the signature verification primitive RSAVP1 described in PKCS#1 v2.1, i.e. RFC 3447 Sect. 5.2.2. Input: s: signature representative, an integer between 0 and n-1, where n is the key modulus. Output: message representative, an integer between 0 and n-1 Not intended to be used directly. Please, see verify() method. """ return self._rsaep(s) def _rsassa_pss_verify(self, M, S, h=None, mgf=None, sLen=None): """ Implements RSASSA-PSS-VERIFY() function described in Sect 8.1.2 of RFC 3447 Input: M: message whose signature is to be verified S: signature to be verified, an octet string of length k, where k is the length in octets of the RSA modulus n. Output: True is the signature is valid. False otherwise. """ # Set default parameters if not provided if h is None: # By default, sha1 h = "sha1" if not h in _hashFuncParams: warning("Key._rsassa_pss_verify(): unknown hash function " "provided (%s)" % h) return False if mgf is None: # use mgf1 with underlying hash function mgf = lambda x,y: pkcs_mgf1(x, y, h) if sLen is None: # use Hash output length (A.2.3 of RFC 3447) hLen = _hashFuncParams[h][0] sLen = hLen # 1) Length checking modBits = self.modulusLen k = modBits / 8 if len(S) != k: return False # 2) RSA verification s = pkcs_os2ip(S) # 2.a) m = self._rsavp1(s) # 2.b) emLen = math.ceil((modBits - 1) / 8.) # 2.c) EM = pkcs_i2osp(m, emLen) # 3) EMSA-PSS verification Result = pkcs_emsa_pss_verify(M, EM, modBits - 1, h, mgf, sLen) return Result # 4) def _rsassa_pkcs1_v1_5_verify(self, M, S, h): """ Implements RSASSA-PKCS1-v1_5-VERIFY() function as described in Sect. 8.2.2 of RFC 3447. Input: M: message whose signature is to be verified, an octet string S: signature to be verified, an octet string of length k, where k is the length in octets of the RSA modulus n h: hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', 'sha256', 'sha384'). Output: True if the signature is valid. False otherwise. """ # 1) Length checking k = self.modulusLen / 8 if len(S) != k: warning("invalid signature (len(S) != k)") return False # 2) RSA verification s = pkcs_os2ip(S) # 2.a) m = self._rsavp1(s) # 2.b) EM = pkcs_i2osp(m, k) # 2.c) # 3) EMSA-PKCS1-v1_5 encoding EMPrime = pkcs_emsa_pkcs1_v1_5_encode(M, k, h) if EMPrime is None: warning("Key._rsassa_pkcs1_v1_5_verify(): unable to encode.") return False # 4) Comparison return EM == EMPrime def verify(self, M, S, t=None, h=None, mgf=None, sLen=None): """ Verify alleged signature 'S' is indeed the signature of message 'M' using 't' signature scheme where 't' can be: - None: the alleged signature 'S' is directly applied the RSAVP1 signature primitive, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect 5.2.1. Simply put, the provided signature is applied a moular exponentiation using the public key. Then, a comparison of the result is done against 'M'. On match, True is returned. Additionnal method parameters are just ignored. - 'pkcs': the alleged signature 'S' and message 'M' are applied RSASSA-PKCS1-v1_5-VERIFY signature verification scheme as described in Sect. 8.2.2 of RFC 3447. In that context, the hash function name is passed using 'h'. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". If none is provided, sha1 is used. Other additionnal parameters are ignored. - 'pss': the alleged signature 'S' and message 'M' are applied RSASSA-PSS-VERIFY signature scheme as described in Sect. 8.1.2. of RFC 3447. In that context, o 'h' parameter provides the name of the hash method to use. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". if none is provided, sha1 is used. o 'mgf' is the mask generation function. By default, mgf is derived from the provided hash function using the generic MGF1 (see pkcs_mgf1() for details). o 'sLen' is the length in octet of the salt. You can overload the default value (the octet length of the hash value for provided algorithm) by providing another one with that parameter. """ if t is None: # RSAVP1 S = pkcs_os2ip(S) n = self.modulus if S > n-1: warning("Signature to be verified is too long for key modulus") return False m = self._rsavp1(S) if m is None: return False l = int(math.ceil(math.log(m, 2) / 8.)) # Hack m = pkcs_i2osp(m, l) return M == m elif t == "pkcs": # RSASSA-PKCS1-v1_5-VERIFY if h is None: h = "sha1" return self._rsassa_pkcs1_v1_5_verify(M, S, h) elif t == "pss": # RSASSA-PSS-VERIFY return self._rsassa_pss_verify(M, S, h, mgf, sLen) else: warning("Key.verify(): Unknown signature type (%s) provided" % t) return None class _DecryptAndSignMethods(OSSLHelper): ### Below are decryption related methods. Encryption ones are inherited ### from PubKey def _rsadp(self, c): """ Internal method providing raw RSA decryption, i.e. simple modular exponentiation of the given ciphertext representative 'c', a long between 0 and n-1. This is the decryption primitive RSADP described in PKCS#1 v2.1, i.e. RFC 3447 Sect. 5.1.2. Input: c: ciphertest representative, a long between 0 and n-1, where n is the key modulus. Output: ciphertext representative, a long between 0 and n-1 Not intended to be used directly. Please, see encrypt() method. """ n = self.modulus if type(c) is int: c = long(c) if type(c) is not long or c > n-1: warning("Key._rsaep() expects a long between 0 and n-1") return None return self.key.decrypt(c) def _rsaes_pkcs1_v1_5_decrypt(self, C): """ Implements RSAES-PKCS1-V1_5-DECRYPT() function described in section 7.2.2 of RFC 3447. Input: C: ciphertext to be decrypted, an octet string of length k, where k is the length in octets of the RSA modulus n. Output: an octet string of length k at most k - 11 on error, None is returned. """ # 1) Length checking cLen = len(C) k = self.modulusLen / 8 if cLen != k or k < 11: warning("Key._rsaes_pkcs1_v1_5_decrypt() decryption error " "(cLen != k or k < 11)") return None # 2) RSA decryption c = pkcs_os2ip(C) # 2.a) m = self._rsadp(c) # 2.b) EM = pkcs_i2osp(m, k) # 2.c) # 3) EME-PKCS1-v1_5 decoding # I am aware of the note at the end of 7.2.2 regarding error # conditions reporting but the one provided below are for _local_ # debugging purposes. --arno if EM[0] != '\x00': warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error " "(first byte is not 0x00)") return None if EM[1] != '\x02': warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error " "(second byte is not 0x02)") return None tmp = EM[2:].split('\x00', 1) if len(tmp) != 2: warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error " "(no 0x00 to separate PS from M)") return None PS, M = tmp if len(PS) < 8: warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error " "(PS is less than 8 byte long)") return None return M # 4) def _rsaes_oaep_decrypt(self, C, h=None, mgf=None, L=None): """ Internal method providing RSAES-OAEP-DECRYPT as defined in Sect. 7.1.2 of RFC 3447. Not intended to be used directly. Please, see encrypt() method for type "OAEP". Input: C : ciphertext to be decrypted, an octet string of length k, where k = 2*hLen + 2 (k denotes the length in octets of the RSA modulus and hLen the length in octets of the hash function output) h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', 'sha256', 'sha384'). 'sha1' is used if none is provided. mgf: the mask generation function f : seed, maskLen -> mask L : optional label whose association with the message is to be verified; the default value for L, if not provided is the empty string. Output: message, an octet string of length k mLen, where mLen <= k - 2*hLen - 2 On error, None is returned. """ # The steps below are the one described in Sect. 7.1.2 of RFC 3447. # 1) Length Checking # 1.a) is not done if h is None: h = "sha1" if not h in _hashFuncParams: warning("Key._rsaes_oaep_decrypt(): unknown hash function %s.", h) return None hLen = _hashFuncParams[h][0] hFun = _hashFuncParams[h][1] k = self.modulusLen / 8 cLen = len(C) if cLen != k: # 1.b) warning("Key._rsaes_oaep_decrypt(): decryption error. " "(cLen != k)") return None if k < 2*hLen + 2: warning("Key._rsaes_oaep_decrypt(): decryption error. " "(k < 2*hLen + 2)") return None # 2) RSA decryption c = pkcs_os2ip(C) # 2.a) m = self._rsadp(c) # 2.b) EM = pkcs_i2osp(m, k) # 2.c) # 3) EME-OAEP decoding if L is None: # 3.a) L = "" lHash = hFun(L) Y = EM[:1] # 3.b) if Y != '\x00': warning("Key._rsaes_oaep_decrypt(): decryption error. " "(Y is not zero)") return None maskedSeed = EM[1:1+hLen] maskedDB = EM[1+hLen:] if mgf is None: mgf = lambda x,y: pkcs_mgf1(x, y, h) seedMask = mgf(maskedDB, hLen) # 3.c) seed = strxor(maskedSeed, seedMask) # 3.d) dbMask = mgf(seed, k - hLen - 1) # 3.e) DB = strxor(maskedDB, dbMask) # 3.f) # I am aware of the note at the end of 7.1.2 regarding error # conditions reporting but the one provided below are for _local_ # debugging purposes. --arno lHashPrime = DB[:hLen] # 3.g) tmp = DB[hLen:].split('\x01', 1) if len(tmp) != 2: warning("Key._rsaes_oaep_decrypt(): decryption error. " "(0x01 separator not found)") return None PS, M = tmp if PS != '\x00'*len(PS): warning("Key._rsaes_oaep_decrypt(): decryption error. " "(invalid padding string)") return None if lHash != lHashPrime: warning("Key._rsaes_oaep_decrypt(): decryption error. " "(invalid hash)") return None return M # 4) def decrypt(self, C, t=None, h=None, mgf=None, L=None): """ Decrypt ciphertext 'C' using 't' decryption scheme where 't' can be: - None: the ciphertext 'C' is directly applied the RSADP decryption primitive, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect 5.1.2. Simply, put the message undergo a modular exponentiation using the private key. Additionnal method parameters are just ignored. - 'pkcs': the ciphertext 'C' is applied RSAES-PKCS1-V1_5-DECRYPT decryption scheme as described in section 7.2.2 of RFC 3447. In that context, other parameters ('h', 'mgf', 'l') are not used. - 'oaep': the ciphertext 'C' is applied the RSAES-OAEP-DECRYPT decryption scheme, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect 7.1.2. In that context, o 'h' parameter provides the name of the hash method to use. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". if none is provided, sha1 is used by default. o 'mgf' is the mask generation function. By default, mgf is derived from the provided hash function using the generic MGF1 (see pkcs_mgf1() for details). o 'L' is the optional label to be associated with the message. If not provided, the default value is used, i.e the empty string. No check is done on the input limitation of the hash function regarding the size of 'L' (for instance, 2^61 - 1 for SHA-1). You have been warned. """ if t is None: C = pkcs_os2ip(C) c = self._rsadp(C) l = int(math.ceil(math.log(c, 2) / 8.)) # Hack return pkcs_i2osp(c, l) elif t == "pkcs": return self._rsaes_pkcs1_v1_5_decrypt(C) elif t == "oaep": return self._rsaes_oaep_decrypt(C, h, mgf, L) else: warning("Key.decrypt(): Unknown decryption type (%s) provided" % t) return None ### Below are signature related methods. Verification ones are inherited from ### PubKey def _rsasp1(self, m): """ Internal method providing raw RSA signature, i.e. simple modular exponentiation of the given message representative 'm', an integer between 0 and n-1. This is the signature primitive RSASP1 described in PKCS#1 v2.1, i.e. RFC 3447 Sect. 5.2.1. Input: m: message representative, an integer between 0 and n-1, where n is the key modulus. Output: signature representative, an integer between 0 and n-1 Not intended to be used directly. Please, see sign() method. """ return self._rsadp(m) def _rsassa_pss_sign(self, M, h=None, mgf=None, sLen=None): """ Implements RSASSA-PSS-SIGN() function described in Sect. 8.1.1 of RFC 3447. Input: M: message to be signed, an octet string Output: signature, an octet string of length k, where k is the length in octets of the RSA modulus n. On error, None is returned. """ # Set default parameters if not provided if h is None: # By default, sha1 h = "sha1" if not h in _hashFuncParams: warning("Key._rsassa_pss_sign(): unknown hash function " "provided (%s)" % h) return None if mgf is None: # use mgf1 with underlying hash function mgf = lambda x,y: pkcs_mgf1(x, y, h) if sLen is None: # use Hash output length (A.2.3 of RFC 3447) hLen = _hashFuncParams[h][0] sLen = hLen # 1) EMSA-PSS encoding modBits = self.modulusLen k = modBits / 8 EM = pkcs_emsa_pss_encode(M, modBits - 1, h, mgf, sLen) if EM is None: warning("Key._rsassa_pss_sign(): unable to encode") return None # 2) RSA signature m = pkcs_os2ip(EM) # 2.a) s = self._rsasp1(m) # 2.b) S = pkcs_i2osp(s, k) # 2.c) return S # 3) def _rsassa_pkcs1_v1_5_sign(self, M, h): """ Implements RSASSA-PKCS1-v1_5-SIGN() function as described in Sect. 8.2.1 of RFC 3447. Input: M: message to be signed, an octet string h: hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls' 'sha256', 'sha384'). Output: the signature, an octet string. """ # 1) EMSA-PKCS1-v1_5 encoding k = self.modulusLen / 8 EM = pkcs_emsa_pkcs1_v1_5_encode(M, k, h) if EM is None: warning("Key._rsassa_pkcs1_v1_5_sign(): unable to encode") return None # 2) RSA signature m = pkcs_os2ip(EM) # 2.a) s = self._rsasp1(m) # 2.b) S = pkcs_i2osp(s, k) # 2.c) return S # 3) def sign(self, M, t=None, h=None, mgf=None, sLen=None): """ Sign message 'M' using 't' signature scheme where 't' can be: - None: the message 'M' is directly applied the RSASP1 signature primitive, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect 5.2.1. Simply put, the message undergo a modular exponentiation using the private key. Additionnal method parameters are just ignored. - 'pkcs': the message 'M' is applied RSASSA-PKCS1-v1_5-SIGN signature scheme as described in Sect. 8.2.1 of RFC 3447. In that context, the hash function name is passed using 'h'. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". If none is provided, sha1 is used. Other additionnal parameters are ignored. - 'pss' : the message 'M' is applied RSASSA-PSS-SIGN signature scheme as described in Sect. 8.1.1. of RFC 3447. In that context, o 'h' parameter provides the name of the hash method to use. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". if none is provided, sha1 is used. o 'mgf' is the mask generation function. By default, mgf is derived from the provided hash function using the generic MGF1 (see pkcs_mgf1() for details). o 'sLen' is the length in octet of the salt. You can overload the default value (the octet length of the hash value for provided algorithm) by providing another one with that parameter. """ if t is None: # RSASP1 M = pkcs_os2ip(M) n = self.modulus if M > n-1: warning("Message to be signed is too long for key modulus") return None s = self._rsasp1(M) if s is None: return None return pkcs_i2osp(s, self.modulusLen/8) elif t == "pkcs": # RSASSA-PKCS1-v1_5-SIGN if h is None: h = "sha1" return self._rsassa_pkcs1_v1_5_sign(M, h) elif t == "pss": # RSASSA-PSS-SIGN return self._rsassa_pss_sign(M, h, mgf, sLen) else: warning("Key.sign(): Unknown signature type (%s) provided" % t) return None def openssl_parse_RSA(fmt="PEM"): return popen3(['openssl', 'rsa', '-text', '-pubin', '-inform', fmt, '-noout']) def openssl_convert_RSA(infmt="PEM", outfmt="DER"): return ['openssl', 'rsa', '-pubin', '-inform', infmt, '-outform', outfmt] class PubKey(OSSLHelper, _EncryptAndVerify): # Below are the fields we recognize in the -text output of openssl # and from which we extract information. We expect them in that # order. Number of spaces does matter. possible_fields = [ "Modulus (", "Exponent:" ] possible_fields_count = len(possible_fields) def __init__(self, keypath): error_msg = "Unable to import key." # XXX Temporary hack to use PubKey inside Cert if type(keypath) is tuple: e, m, mLen = keypath self.modulus = m self.modulusLen = mLen self.pubExp = e return fields_dict = {} for k in self.possible_fields: fields_dict[k] = None self.keypath = None rawkey = None if (not '\x00' in keypath) and os.path.isfile(keypath): # file self.keypath = keypath key_size = os.path.getsize(keypath) if key_size > MAX_KEY_SIZE: raise Exception(error_msg) try: f = open(keypath) rawkey = f.read() f.close() except: raise Exception(error_msg) else: rawkey = keypath if rawkey is None: raise Exception(error_msg) self.rawkey = rawkey key_header = "-----BEGIN PUBLIC KEY-----" key_footer = "-----END PUBLIC KEY-----" l = rawkey.split(key_header, 1) if len(l) == 2: # looks like PEM tmp = l[1] l = tmp.split(key_footer, 1) if len(l) == 2: tmp = l[0] rawkey = "%s%s%s\n" % (key_header, tmp, key_footer) else: raise Exception(error_msg) r,w,e = openssl_parse_RSA("PEM") w.write(rawkey) w.close() textkey = r.read() r.close() res = e.read() e.close() if res == '': self.format = "PEM" self.pemkey = rawkey self.textkey = textkey cmd = openssl_convert_RSA_cmd("PEM", "DER") self.derkey = self._apply_ossl_cmd(cmd, rawkey) else: raise Exception(error_msg) else: # not PEM, try DER r,w,e = openssl_parse_RSA("DER") w.write(rawkey) w.close() textkey = r.read() r.close() res = e.read() if res == '': self.format = "DER" self.derkey = rawkey self.textkey = textkey cmd = openssl_convert_RSA_cmd("DER", "PEM") self.pemkey = self._apply_ossl_cmd(cmd, rawkey) cmd = openssl_convert_RSA_cmd("DER", "DER") self.derkey = self._apply_ossl_cmd(cmd, rawkey) else: try: # Perhaps it is a cert c = Cert(keypath) except: raise Exception(error_msg) # TODO: # Reconstruct a key (der and pem) and provide: # self.format # self.derkey # self.pemkey # self.textkey # self.keypath self.osslcmdbase = ['openssl', 'rsa', '-pubin', '-inform', self.format] self.keypath = keypath # Parse the -text output of openssl to make things available l = self.textkey.split('\n', 1) if len(l) != 2: raise Exception(error_msg) cur, tmp = l i = 0 k = self.possible_fields[i] # Modulus ( cur = cur[len(k):] + '\n' while k: l = tmp.split('\n', 1) if len(l) != 2: # Over fields_dict[k] = cur break l, tmp = l newkey = 0 # skip fields we have already seen, this is the purpose of 'i' for j in range(i, self.possible_fields_count): f = self.possible_fields[j] if l.startswith(f): fields_dict[k] = cur cur = l[len(f):] + '\n' k = f newkey = 1 i = j+1 break if newkey == 1: continue cur += l + '\n' # modulus and modulus length v = fields_dict["Modulus ("] self.modulusLen = None if v: v, rem = v.split(' bit):', 1) self.modulusLen = int(v) rem = rem.replace('\n','').replace(' ','').replace(':','') self.modulus = long(rem, 16) if self.modulus is None: raise Exception(error_msg) # public exponent v = fields_dict["Exponent:"] self.pubExp = None if v: self.pubExp = long(v.split('(', 1)[0]) if self.pubExp is None: raise Exception(error_msg) self.key = RSA.construct((self.modulus, self.pubExp, )) def __str__(self): return self.derkey class Key(_DecryptAndSignMethods, _EncryptAndVerify): # Below are the fields we recognize in the -text output of openssl # and from which we extract information. We expect them in that # order. Number of spaces does matter. possible_fields = [ "Private-Key: (", "modulus:", "publicExponent:", "privateExponent:", "prime1:", "prime2:", "exponent1:", "exponent2:", "coefficient:" ] possible_fields_count = len(possible_fields) def __init__(self, keypath): error_msg = "Unable to import key." fields_dict = {} for k in self.possible_fields: fields_dict[k] = None self.keypath = None rawkey = None if (not '\x00' in keypath) and os.path.isfile(keypath): self.keypath = keypath key_size = os.path.getsize(keypath) if key_size > MAX_KEY_SIZE: raise Exception(error_msg) try: f = open(keypath) rawkey = f.read() f.close() except: raise Exception(error_msg) else: rawkey = keypath if rawkey is None: raise Exception(error_msg) self.rawkey = rawkey # Let's try to get file format : PEM or DER. fmtstr = 'openssl rsa -text -inform %s -noout' convertstr = 'openssl rsa -inform %s -outform %s' key_header = "-----BEGIN RSA PRIVATE KEY-----" key_footer = "-----END RSA PRIVATE KEY-----" l = rawkey.split(key_header, 1) if len(l) == 2: # looks like PEM tmp = l[1] l = tmp.split(key_footer, 1) if len(l) == 2: tmp = l[0] rawkey = "%s%s%s\n" % (key_header, tmp, key_footer) else: raise Exception(error_msg) r,w,e = popen3((fmtstr % "PEM").split(" ")) w.write(rawkey) w.close() textkey = r.read() r.close() res = e.read() e.close() if res == '': self.format = "PEM" self.pemkey = rawkey self.textkey = textkey cmd = (convertstr % ("PEM", "DER")).split(" ") self.derkey = self._apply_ossl_cmd(cmd, rawkey) else: raise Exception(error_msg) else: # not PEM, try DER r,w,e = popen3((fmtstr % "DER").split(" ")) w.write(rawkey) w.close() textkey = r.read() r.close() res = e.read() if res == '': self.format = "DER" self.derkey = rawkey self.textkey = textkey cmd = (convertstr % ("DER", "PEM")).split(" ") self.pemkey = self._apply_ossl_cmd(cmd, rawkey) cmd = (convertstr % ("DER", "DER")).split(" ") self.derkey = self._apply_ossl_cmd(cmd, rawkey) else: raise Exception(error_msg) self.osslcmdbase = ['openssl', 'rsa', '-inform', self.format] r,w,e = popen3(["openssl", "asn1parse", "-inform", "DER"]) w.write(self.derkey) w.close() self.asn1parsekey = r.read() r.close() res = e.read() e.close() if res != '': raise Exception(error_msg) self.keypath = keypath # Parse the -text output of openssl to make things available l = self.textkey.split('\n', 1) if len(l) != 2: raise Exception(error_msg) cur, tmp = l i = 0 k = self.possible_fields[i] # Private-Key: ( cur = cur[len(k):] + '\n' while k: l = tmp.split('\n', 1) if len(l) != 2: # Over fields_dict[k] = cur break l, tmp = l newkey = 0 # skip fields we have already seen, this is the purpose of 'i' for j in range(i, self.possible_fields_count): f = self.possible_fields[j] if l.startswith(f): fields_dict[k] = cur cur = l[len(f):] + '\n' k = f newkey = 1 i = j+1 break if newkey == 1: continue cur += l + '\n' # modulus length v = fields_dict["Private-Key: ("] self.modulusLen = None if v: self.modulusLen = int(v.split(' bit', 1)[0]) if self.modulusLen is None: raise Exception(error_msg) # public exponent v = fields_dict["publicExponent:"] self.pubExp = None if v: self.pubExp = long(v.split('(', 1)[0]) if self.pubExp is None: raise Exception(error_msg) tmp = {} for k in ["modulus:", "privateExponent:", "prime1:", "prime2:", "exponent1:", "exponent2:", "coefficient:"]: v = fields_dict[k] if v: s = v.replace('\n', '').replace(' ', '').replace(':', '') tmp[k] = long(s, 16) else: raise Exception(error_msg) self.modulus = tmp["modulus:"] self.privExp = tmp["privateExponent:"] self.prime1 = tmp["prime1:"] self.prime2 = tmp["prime2:"] self.exponent1 = tmp["exponent1:"] self.exponent2 = tmp["exponent2:"] self.coefficient = tmp["coefficient:"] self.key = RSA.construct((self.modulus, self.pubExp, self.privExp)) def __str__(self): return self.derkey # We inherit from PubKey to get access to all encryption and verification # methods. To have that working, we simply need Cert to provide # modulusLen and key attribute. # XXX Yes, it is a hack. class Cert(OSSLHelper, _EncryptAndVerify): # Below are the fields we recognize in the -text output of openssl # and from which we extract information. We expect them in that # order. Number of spaces does matter. possible_fields = [ " Version:", " Serial Number:", " Signature Algorithm:", " Issuer:", " Not Before:", " Not After :", " Subject:", " Public Key Algorithm:", " Modulus (", " Exponent:", " X509v3 Subject Key Identifier:", " X509v3 Authority Key Identifier:", " keyid:", " DirName:", " serial:", " X509v3 Basic Constraints:", " X509v3 Key Usage:", " X509v3 Extended Key Usage:", " X509v3 CRL Distribution Points:", " Authority Information Access:", " Signature Algorithm:" ] possible_fields_count = len(possible_fields) def __init__(self, certpath): error_msg = "Unable to import certificate." fields_dict = {} for k in self.possible_fields: fields_dict[k] = None self.certpath = None rawcert = None if (not '\x00' in certpath) and os.path.isfile(certpath): # file self.certpath = certpath cert_size = os.path.getsize(certpath) if cert_size > MAX_CERT_SIZE: raise Exception(error_msg) try: f = open(certpath) rawcert = f.read() f.close() except: raise Exception(error_msg) else: rawcert = certpath if rawcert is None: raise Exception(error_msg) self.rawcert = rawcert # Let's try to get file format : PEM or DER. fmtstr = 'openssl x509 -text -inform %s -noout' convertstr = 'openssl x509 -inform %s -outform %s' cert_header = "-----BEGIN CERTIFICATE-----" cert_footer = "-----END CERTIFICATE-----" l = rawcert.split(cert_header, 1) if len(l) == 2: # looks like PEM tmp = l[1] l = tmp.split(cert_footer, 1) if len(l) == 2: tmp = l[0] rawcert = "%s%s%s\n" % (cert_header, tmp, cert_footer) else: raise Exception(error_msg) r,w,e = popen3((fmtstr % "PEM").split(" ")) w.write(rawcert) w.close() textcert = r.read() r.close() res = e.read() e.close() if res == '': self.format = "PEM" self.pemcert = rawcert self.textcert = textcert cmd = (convertstr % ("PEM", "DER")).split(" ") self.dercert = self._apply_ossl_cmd(cmd, rawcert) else: raise Exception(error_msg) else: # not PEM, try DER r,w,e = popen3((fmtstr % "DER").split(" ")) w.write(rawcert) w.close() textcert = r.read() r.close() res = e.read() if res == '': self.format = "DER" self.dercert = rawcert self.textcert = textcert cmd = (convertstr % ("DER", "PEM")).split(" ") self.pemcert = self._apply_ossl_cmd(cmd, rawcert) cmd = (convertstr % ("DER", "DER")).split(" ") self.dercert = self._apply_ossl_cmd(cmd, rawcert) else: raise Exception(error_msg) self.osslcmdbase = ['openssl', 'x509', '-inform', self.format] r,w,e = popen3('openssl asn1parse -inform DER'.split(' ')) w.write(self.dercert) w.close() self.asn1parsecert = r.read() r.close() res = e.read() e.close() if res != '': raise Exception(error_msg) # Grab _raw_ X509v3 Authority Key Identifier, if any. tmp = self.asn1parsecert.split(":X509v3 Authority Key Identifier", 1) self.authorityKeyID = None if len(tmp) == 2: tmp = tmp[1] tmp = tmp.split("[HEX DUMP]:", 1)[1] self.authorityKeyID=tmp.split('\n',1)[0] # Grab _raw_ X509v3 Subject Key Identifier, if any. tmp = self.asn1parsecert.split(":X509v3 Subject Key Identifier", 1) self.subjectKeyID = None if len(tmp) == 2: tmp = tmp[1] tmp = tmp.split("[HEX DUMP]:", 1)[1] self.subjectKeyID=tmp.split('\n',1)[0] # Get tbsCertificate using the worst hack. output of asn1parse # looks like that: # # 0:d=0 hl=4 l=1298 cons: SEQUENCE # 4:d=1 hl=4 l=1018 cons: SEQUENCE # ... # l1,l2 = self.asn1parsecert.split('\n', 2)[:2] hl1 = int(l1.split("hl=",1)[1].split("l=",1)[0]) rem = l2.split("hl=",1)[1] hl2, rem = rem.split("l=",1) hl2 = int(hl2) l = int(rem.split("cons",1)[0]) self.tbsCertificate = self.dercert[hl1:hl1+hl2+l] # Parse the -text output of openssl to make things available tmp = self.textcert.split('\n', 2)[2] l = tmp.split('\n', 1) if len(l) != 2: raise Exception(error_msg) cur, tmp = l i = 0 k = self.possible_fields[i] # Version: cur = cur[len(k):] + '\n' while k: l = tmp.split('\n', 1) if len(l) != 2: # Over fields_dict[k] = cur break l, tmp = l newkey = 0 # skip fields we have already seen, this is the purpose of 'i' for j in range(i, self.possible_fields_count): f = self.possible_fields[j] if l.startswith(f): fields_dict[k] = cur cur = l[len(f):] + '\n' k = f newkey = 1 i = j+1 break if newkey == 1: continue cur += l + '\n' # version v = fields_dict[" Version:"] self.version = None if v: self.version = int(v[1:2]) if self.version is None: raise Exception(error_msg) # serial number v = fields_dict[" Serial Number:"] self.serial = None if v: v = v.replace('\n', '').strip() if "0x" in v: v = v.split("0x", 1)[1].split(')', 1)[0] v = v.replace(':', '').upper() if len(v) % 2: v = '0' + v self.serial = v if self.serial is None: raise Exception(error_msg) # Signature Algorithm v = fields_dict[" Signature Algorithm:"] self.sigAlg = None if v: v = v.split('\n',1)[0] v = v.strip() self.sigAlg = v if self.sigAlg is None: raise Exception(error_msg) # issuer v = fields_dict[" Issuer:"] self.issuer = None if v: v = v.split('\n',1)[0] v = v.strip() self.issuer = v if self.issuer is None: raise Exception(error_msg) # not before v = fields_dict[" Not Before:"] self.notBefore_str = None if v: v = v.split('\n',1)[0] v = v.strip() self.notBefore_str = v if self.notBefore_str is None: raise Exception(error_msg) try: self.notBefore = time.strptime(self.notBefore_str, "%b %d %H:%M:%S %Y %Z") except: self.notBefore = time.strptime(self.notBefore_str, "%b %d %H:%M:%S %Y") self.notBefore_str_simple = time.strftime("%x", self.notBefore) # not after v = fields_dict[" Not After :"] self.notAfter_str = None if v: v = v.split('\n',1)[0] v = v.strip() self.notAfter_str = v if self.notAfter_str is None: raise Exception(error_msg) try: self.notAfter = time.strptime(self.notAfter_str, "%b %d %H:%M:%S %Y %Z") except: self.notAfter = time.strptime(self.notAfter_str, "%b %d %H:%M:%S %Y") self.notAfter_str_simple = time.strftime("%x", self.notAfter) # subject v = fields_dict[" Subject:"] self.subject = None if v: v = v.split('\n',1)[0] v = v.strip() self.subject = v if self.subject is None: raise Exception(error_msg) # Public Key Algorithm v = fields_dict[" Public Key Algorithm:"] self.pubKeyAlg = None if v: v = v.split('\n',1)[0] v = v.strip() self.pubKeyAlg = v if self.pubKeyAlg is None: raise Exception(error_msg) # Modulus v = fields_dict[" Modulus ("] self.modulus = None if v: v,t = v.split(' bit):',1) self.modulusLen = int(v) t = t.replace(' ', '').replace('\n', ''). replace(':', '') self.modulus_hexdump = t self.modulus = long(t, 16) if self.modulus is None: raise Exception(error_msg) # Exponent v = fields_dict[" Exponent:"] self.exponent = None if v: v = v.split('(',1)[0] self.exponent = long(v) if self.exponent is None: raise Exception(error_msg) # Public Key instance self.key = RSA.construct((self.modulus, self.exponent, )) # Subject Key Identifier # Authority Key Identifier: keyid, dirname and serial self.authorityKeyID_keyid = None self.authorityKeyID_dirname = None self.authorityKeyID_serial = None if self.authorityKeyID: # (hex version already done using asn1parse) v = fields_dict[" keyid:"] if v: v = v.split('\n',1)[0] v = v.strip().replace(':', '') self.authorityKeyID_keyid = v v = fields_dict[" DirName:"] if v: v = v.split('\n',1)[0] self.authorityKeyID_dirname = v v = fields_dict[" serial:"] if v: v = v.split('\n',1)[0] v = v.strip().replace(':', '') self.authorityKeyID_serial = v # Basic constraints self.basicConstraintsCritical = False self.basicConstraints=None v = fields_dict[" X509v3 Basic Constraints:"] if v: self.basicConstraints = {} v,t = v.split('\n',2)[:2] if "critical" in v: self.basicConstraintsCritical = True if "CA:" in t: self.basicConstraints["CA"] = t.split('CA:')[1][:4] == "TRUE" if "pathlen:" in t: self.basicConstraints["pathlen"] = int(t.split('pathlen:')[1]) # X509v3 Key Usage self.keyUsage = [] v = fields_dict[" X509v3 Key Usage:"] if v: # man 5 x509v3_config ku_mapping = {"Digital Signature": "digitalSignature", "Non Repudiation": "nonRepudiation", "Key Encipherment": "keyEncipherment", "Data Encipherment": "dataEncipherment", "Key Agreement": "keyAgreement", "Certificate Sign": "keyCertSign", "CRL Sign": "cRLSign", "Encipher Only": "encipherOnly", "Decipher Only": "decipherOnly"} v = v.split('\n',2)[1] l = map(lambda x: x.strip(), v.split(',')) while l: c = l.pop() if c in ku_mapping: self.keyUsage.append(ku_mapping[c]) else: self.keyUsage.append(c) # Add it anyway print("Found unknown X509v3 Key Usage: '%s'" % c) print("Report it to arno (at) natisbad.org for addition") # X509v3 Extended Key Usage self.extKeyUsage = [] v = fields_dict[" X509v3 Extended Key Usage:"] if v: # man 5 x509v3_config: eku_mapping = {"TLS Web Server Authentication": "serverAuth", "TLS Web Client Authentication": "clientAuth", "Code Signing": "codeSigning", "E-mail Protection": "emailProtection", "Time Stamping": "timeStamping", "Microsoft Individual Code Signing": "msCodeInd", "Microsoft Commercial Code Signing": "msCodeCom", "Microsoft Trust List Signing": "msCTLSign", "Microsoft Encrypted File System": "msEFS", "Microsoft Server Gated Crypto": "msSGC", "Netscape Server Gated Crypto": "nsSGC", "IPSec End System": "iPsecEndSystem", "IPSec Tunnel": "iPsecTunnel", "IPSec User": "iPsecUser"} v = v.split('\n',2)[1] l = map(lambda x: x.strip(), v.split(',')) while l: c = l.pop() if c in eku_mapping: self.extKeyUsage.append(eku_mapping[c]) else: self.extKeyUsage.append(c) # Add it anyway print("Found unknown X509v3 Extended Key Usage: '%s'" % c) print("Report it to arno (at) natisbad.org for addition") # CRL Distribution points self.cRLDistributionPoints = [] v = fields_dict[" X509v3 CRL Distribution Points:"] if v: v = v.split("\n\n", 1)[0] v = v.split("URI:")[1:] self.CRLDistributionPoints = map(lambda x: x.strip(), v) # Authority Information Access: list of tuples ("method", "location") self.authorityInfoAccess = [] v = fields_dict[" Authority Information Access:"] if v: v = v.split("\n\n", 1)[0] v = v.split("\n")[1:] for e in v: method, location = map(lambda x: x.strip(), e.split(" - ", 1)) self.authorityInfoAccess.append((method, location)) # signature field v = fields_dict[" Signature Algorithm:" ] self.sig = None if v: v = v.split('\n',1)[1] v = v.replace(' ', '').replace('\n', '') self.sig = "".join(map(lambda x: chr(int(x, 16)), v.split(':'))) self.sigLen = len(self.sig) if self.sig is None: raise Exception(error_msg) def isIssuerCert(self, other): """ True if 'other' issued 'self', i.e.: - self.issuer == other.subject - self is signed by other """ # XXX should be done on raw values, instead of their textual repr if self.issuer != other.subject: return False # Sanity check regarding modulus length and the # signature length keyLen = (other.modulusLen + 7)/8 if keyLen != self.sigLen: return False unenc = other.encrypt(self.sig) # public key encryption, i.e. decrypt # XXX Check block type (00 or 01 and type of padding) unenc = unenc[1:] if not '\x00' in unenc: return False pos = unenc.index('\x00') unenc = unenc[pos+1:] found = None for k in _hashFuncParams.keys(): if self.sigAlg.startswith(k): found = k break if not found: return False hlen, hfunc, digestInfo = _hashFuncParams[k] if len(unenc) != (hlen+len(digestInfo)): return False if not unenc.startswith(digestInfo): return False h = unenc[-hlen:] myh = hfunc(self.tbsCertificate) return h == myh def chain(self, certlist): """ Construct the chain of certificates leading from 'self' to the self signed root using the certificates in 'certlist'. If the list does not provide all the required certs to go to the root the function returns a incomplete chain starting with the certificate. This fact can be tested by tchecking if the last certificate of the returned chain is self signed (if c is the result, c[-1].isSelfSigned()) """ d = {} for c in certlist: # XXX we should check if we have duplicate d[c.subject] = c res = [self] cur = self while not cur.isSelfSigned(): if cur.issuer in d: possible_issuer = d[cur.issuer] if cur.isIssuerCert(possible_issuer): res.append(possible_issuer) cur = possible_issuer else: break return res def remainingDays(self, now=None): """ Based on the value of notBefore field, returns the number of days the certificate will still be valid. The date used for the comparison is the current and local date, as returned by time.localtime(), except if 'now' argument is provided another one. 'now' argument can be given as either a time tuple or a string representing the date. Accepted format for the string version are: - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT' - '%m/%d/%y' e.g. '01/30/08' (less precise) If the certificate is no more valid at the date considered, then, a negative value is returned representing the number of days since it has expired. The number of days is returned as a float to deal with the unlikely case of certificates that are still just valid. """ if now is None: now = time.localtime() elif type(now) is str: try: if '/' in now: now = time.strptime(now, '%m/%d/%y') else: now = time.strptime(now, '%b %d %H:%M:%S %Y %Z') except: warning("Bad time string provided '%s'. Using current time" % now) now = time.localtime() now = time.mktime(now) nft = time.mktime(self.notAfter) diff = (nft - now)/(24.*3600) return diff # return SHA-1 hash of cert embedded public key # !! At the moment, the trailing 0 is in the hashed string if any def keyHash(self): m = self.modulus_hexdump res = [] i = 0 l = len(m) while i MAX_CRL_SIZE: raise Exception(error_msg) try: f = open(crlpath) rawcrl = f.read() f.close() except: raise Exception(error_msg) else: rawcrl = crlpath if rawcrl is None: raise Exception(error_msg) self.rawcrl = rawcrl # Let's try to get file format : PEM or DER. fmtstr = 'openssl crl -text -inform %s -noout' convertstr = 'openssl crl -inform %s -outform %s' crl_header = "-----BEGIN X509 CRL-----" crl_footer = "-----END X509 CRL-----" l = rawcrl.split(crl_header, 1) if len(l) == 2: # looks like PEM tmp = l[1] l = tmp.split(crl_footer, 1) if len(l) == 2: tmp = l[0] rawcrl = "%s%s%s\n" % (crl_header, tmp, crl_footer) else: raise Exception(error_msg) r,w,e = popen3((fmtstr % "PEM").split(" ")) w.write(rawcrl) w.close() textcrl = r.read() r.close() res = e.read() e.close() if res == '': self.format = "PEM" self.pemcrl = rawcrl self.textcrl = textcrl cmd = (convertstr % ("PEM", "DER")).split(" ") self.dercrl = self._apply_ossl_cmd(cmd, rawcrl) else: raise Exception(error_msg) else: # not PEM, try DER r,w,e = popen3((fmtstr % "DER").split(' ')) w.write(rawcrl) w.close() textcrl = r.read() r.close() res = e.read() if res == '': self.format = "DER" self.dercrl = rawcrl self.textcrl = textcrl cmd = (convertstr % ("DER", "PEM")).split(" ") self.pemcrl = self._apply_ossl_cmd(cmd, rawcrl) cmd = (convertstr % ("DER", "DER")).split(" ") self.dercrl = self._apply_ossl_cmd(cmd, rawcrl) else: raise Exception(error_msg) self.osslcmdbase = ['openssl', 'crl', '-inform', self.format] r,w,e = popen3(('openssl asn1parse -inform DER').split(" ")) w.write(self.dercrl) w.close() self.asn1parsecrl = r.read() r.close() res = e.read() e.close() if res != '': raise Exception(error_msg) # Grab _raw_ X509v3 Authority Key Identifier, if any. tmp = self.asn1parsecrl.split(":X509v3 Authority Key Identifier", 1) self.authorityKeyID = None if len(tmp) == 2: tmp = tmp[1] tmp = tmp.split("[HEX DUMP]:", 1)[1] self.authorityKeyID=tmp.split('\n',1)[0] # Parse the -text output of openssl to make things available tmp = self.textcrl.split('\n', 1)[1] l = tmp.split('\n', 1) if len(l) != 2: raise Exception(error_msg) cur, tmp = l i = 0 k = self.possible_fields[i] # Version cur = cur[len(k):] + '\n' while k: l = tmp.split('\n', 1) if len(l) != 2: # Over fields_dict[k] = cur break l, tmp = l newkey = 0 # skip fields we have already seen, this is the purpose of 'i' for j in range(i, self.possible_fields_count): f = self.possible_fields[j] if l.startswith(f): fields_dict[k] = cur cur = l[len(f):] + '\n' k = f newkey = 1 i = j+1 break if newkey == 1: continue cur += l + '\n' # version v = fields_dict[" Version"] self.version = None if v: self.version = int(v[1:2]) if self.version is None: raise Exception(error_msg) # signature algorithm v = fields_dict[" Signature Algorithm:"] self.sigAlg = None if v: v = v.split('\n',1)[0] v = v.strip() self.sigAlg = v if self.sigAlg is None: raise Exception(error_msg) # issuer v = fields_dict[" Issuer:"] self.issuer = None if v: v = v.split('\n',1)[0] v = v.strip() self.issuer = v if self.issuer is None: raise Exception(error_msg) # last update v = fields_dict[" Last Update:"] self.lastUpdate_str = None if v: v = v.split('\n',1)[0] v = v.strip() self.lastUpdate_str = v if self.lastUpdate_str is None: raise Exception(error_msg) self.lastUpdate = time.strptime(self.lastUpdate_str, "%b %d %H:%M:%S %Y %Z") self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate) # next update v = fields_dict[" Next Update:"] self.nextUpdate_str = None if v: v = v.split('\n',1)[0] v = v.strip() self.nextUpdate_str = v if self.nextUpdate_str is None: raise Exception(error_msg) self.nextUpdate = time.strptime(self.nextUpdate_str, "%b %d %H:%M:%S %Y %Z") self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate) # XXX Do something for Issuer Alternative Name # Authority Key Identifier: keyid, dirname and serial self.authorityKeyID_keyid = None self.authorityKeyID_dirname = None self.authorityKeyID_serial = None if self.authorityKeyID: # (hex version already done using asn1parse) v = fields_dict[" keyid:"] if v: v = v.split('\n',1)[0] v = v.strip().replace(':', '') self.authorityKeyID_keyid = v v = fields_dict[" DirName:"] if v: v = v.split('\n',1)[0] self.authorityKeyID_dirname = v v = fields_dict[" serial:"] if v: v = v.split('\n',1)[0] v = v.strip().replace(':', '') self.authorityKeyID_serial = v # number v = fields_dict[" X509v3 CRL Number:"] self.number = None if v: v = v.split('\n',2)[1] v = v.strip() self.number = int(v) # Get the list of serial numbers of revoked certificates self.revoked_cert_serials = [] v = fields_dict["Revoked Certificates:"] t = fields_dict["No Revoked Certificates."] if (t is None and v is not None): v = v.split("Serial Number: ")[1:] for r in v: s,d = r.split('\n', 1) s = s.split('\n', 1)[0] d = d.split("Revocation Date:", 1)[1] d = time.strptime(d.strip(), "%b %d %H:%M:%S %Y %Z") self.revoked_cert_serials.append((s,d)) # signature field v = fields_dict[" Signature Algorithm:" ] self.sig = None if v: v = v.split('\n',1)[1] v = v.replace(' ', '').replace('\n', '') self.sig = "".join(map(lambda x: chr(int(x, 16)), v.split(':'))) self.sigLen = len(self.sig) if self.sig is None: raise Exception(error_msg) def __str__(self): return self.dercrl # Print main informations stored in CRL def show(self): print("Version: %d" % self.version) print("sigAlg: " + self.sigAlg) print("Issuer: " + self.issuer) print("lastUpdate: %s" % self.lastUpdate_str_simple) print("nextUpdate: %s" % self.nextUpdate_str_simple) def verify(self, anchors): """ Return True if the CRL is signed by one of the provided anchors. False on error (invalid signature, missing anchorand, ...) """ cafile = create_temporary_ca_file(anchors) if cafile is None: return False try: cmd = self.osslcmdbase + ["-noout", "-CAfile", cafile] cmdres = self._apply_ossl_cmd(cmd, self.rawcrl) except: os.unlink(cafile) return False os.unlink(cafile) return "verify OK" in cmdres