commit 0fb43a5e36f40ae7fccd2128e5e0bbb401b3111a Author: Kon Date: Thu Jan 31 13:54:15 2013 -0500 Initial commit. diff --git a/src/MANIFEST b/src/MANIFEST new file mode 100755 index 00000000..755d8ab8 --- /dev/null +++ b/src/MANIFEST @@ -0,0 +1,30 @@ +quickstart.py +setup.py +simplejson/__init__.py +simplejson/decoder.py +simplejson/encoder.py +simplejson/ordered_dict.py +simplejson/scanner.py +simplejson/tool.py +tuf/__init__.py +tuf/checkjson.py +tuf/conf.py +tuf/download.py +tuf/formats.py +tuf/hash.py +tuf/keydb.py +tuf/keys.py +tuf/log.py +tuf/mirrors.py +tuf/sig.py +tuf/util.py +tuf/client/__init__.py +tuf/client/updater.py +tuf/pushtools/push.py +tuf/pushtools/receivetools/receive.py +tuf/pushtools/transfer/__init__.py +tuf/pushtools/transfer/scp.py +tuf/repo/__init__.py +tuf/repo/keystore.py +tuf/repo/signercli.py +tuf/repo/signerlib.py diff --git a/src/evpy/__init__.py b/src/evpy/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/src/evpy/cipher.py b/src/evpy/cipher.py new file mode 100755 index 00000000..de904067 --- /dev/null +++ b/src/evpy/cipher.py @@ -0,0 +1,200 @@ +#! /usr/bin/env python + +""" +cipher.py + +Written by Geremy Condra +Released on 18 March 2010 +Licensed under MIT License + +This module provides a basic interface to OpenSSL's EVP +cipher functions. + +All the functions in this module raise CipherError on +malfunction. + +From an end-user perspective, this module should be used +in situations where you want to have a single generally +human-readable or human-generated key used for both +encryption and decryption. + +This means that as a general rule, if your application +involves transmitting this key over an insecure channel +you should not be using this module, but rather +evpy.envelope. + +Usage: + + >>> from evpy import cipher + >>> message = b"this is data" + >>> pw = b"mypassword" + >>> salt, iv, enc = cipher.encrypt(message, pw) + >>> cipher.decrypt(salt, iv, enc, pw) + 'this is data' +""" + +import ctypes +import time + +import evp + + +class CipherError(evp.SSLError): + pass + +def _strengthen_password(pw, iv, salt=None): + # add the hash + evp.OpenSSL_add_all_digests() + # build the key buffer + key = ctypes.create_string_buffer(24) + # either take the existing salt or build a new one + if not salt: + salt = ctypes.create_string_buffer(8) + # get the needed entropy, bailing if it doesn't work in + # the first thousand tries + for i in range(1000): + if evp.RAND_bytes(salt, 8): break + else: + raise CipherError("Could not generate enough entropy") + # extract the salt + salt = salt.raw + # get the hash + evp_hash = evp.EVP_get_digestbyname("sha512") + if not evp_hash: + raise CipherError("Could not create hash object") + # fill the key + if not evp.EVP_BytesToKey(evp.EVP_aes_192_cbc(), evp_hash, salt, pw, len(pw), 1000, key, iv): + raise CipherError("Could not strengthen key") + # go home + return salt, key.raw + + +def encrypt(data, password): + """Encrypts the given data, raising CipherError on failure. + + This uses AES192 to encrypt and strengthens the given + passphrase using SHA512. + + Usage: + >>> from evpy import cipher + >>> f = open("test/short.txt", "rb") + >>> data = f.read() + >>> pw = b"mypassword" + >>> salt, iv, enc = cipher.encrypt(data, pw) + >>> cipher.decrypt(salt, iv, enc, pw) == data + True + """ + # ensure data exists + if not len(data): + raise CipherError("Data must actually exist") + if not len(password): + raise CipherError("Password must actually exist") + + # build and initialize the context + ctx = evp.EVP_CIPHER_CTX_new() + if not ctx: + raise CipherError("Could not create context") + evp.EVP_CIPHER_CTX_init(ctx) + + # get the cipher object + cipher_object = evp.EVP_aes_192_cbc() + if not cipher_object: + raise CipherError("Could not create cipher object") + + # finish the context and cipher object + if not evp.EVP_EncryptInit_ex(ctx, cipher_object, None, None, None): + raise CipherError("Could not finish context") + + # build the randomized iv + iv_length = evp.EVP_CIPHER_CTX_iv_length(ctx) + iv = ctypes.create_string_buffer(iv_length) + # get the needed entropy, bailing if it doesn't work in + # the first thousand tries + for i in range(1000): + if evp.RAND_bytes(iv, iv_length): break + else: + raise CipherError("Not enough entropy for IV") + output_iv = iv.raw + + # strengthen the password into an honest-to-goodness key + salt, aes_key = _strengthen_password(password, iv) + + # initialize the encryption operation + if not evp.EVP_EncryptInit_ex(ctx, None, None, aes_key, iv): + raise CipherError("Could not start encryption operation") + + # build the output buffer + buf = ctypes.create_string_buffer(len(data) + 16) + written = ctypes.c_int(0) + final = ctypes.c_int(0) + + # update + if not evp.EVP_EncryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)): + raise CipherError("Could not update ciphertext") + output = buf.raw[:written.value] + + # finalize + if not evp.EVP_EncryptFinal_ex(ctx, buf, ctypes.byref(final)): + raise CipherError("Could not finalize ciphertext") + output += buf.raw[:final.value] + + # ...and go home + return salt, output_iv, output + + +def decrypt(salt, iv, data, password): + """Decrypts the given data, raising CipherError on failure. + + Usage: + >>> from evpy import cipher + >>> f = open("test/short.txt", "rb") + >>> data = f.read() + >>> pw = b"mypassword" + >>> salt, iv, enc = cipher.encrypt(data, pw) + >>> cipher.decrypt(salt, iv, enc, pw) == data + True + """ + # ensure inputs are the correct size + if not len(data): + raise CipherError("Data must actually exist") + if not len(password): + raise CipherError("Password must actually exist") + if len(salt) != 8: + raise CipherError("Incorrect salt size") + if len(iv) != 16: + raise CipherError("Incorrect iv size") + + # build and initialize the context + ctx = evp.EVP_CIPHER_CTX_new() + if not ctx: + raise CipherError("Could not create context") + evp.EVP_CIPHER_CTX_init(ctx) + + # get the cipher object + cipher_object = evp.EVP_aes_192_cbc() + if not cipher_object: + raise CipherError("Could not create cipher object") + + # build the key + salt, key = _strengthen_password(password, iv, salt) + + # start decrypting the ciphertext + if not evp.EVP_DecryptInit_ex(ctx, cipher_object, None, key, iv): + raise CipherError("Could not open envelope") + + # build the output buffers + buf = ctypes.create_string_buffer(len(data) + 16) + written = ctypes.c_int(0) + final = ctypes.c_int(0) + + # update + if not evp.EVP_DecryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)): + raise CipherError("Could not update plaintext") + output = buf.raw[:written.value] + + # finalize + if not evp.EVP_DecryptFinal_ex(ctx, buf, ctypes.byref(final)): + raise CipherError("Could not finalize decryption") + output += buf.raw[:final.value] + + return output diff --git a/src/evpy/envelope.py b/src/evpy/envelope.py new file mode 100755 index 00000000..16ee8f68 --- /dev/null +++ b/src/evpy/envelope.py @@ -0,0 +1,326 @@ +#! /usr/bin/env python + +""" +envelope.py + +Written by Geremy Condra +Released on 18 March 2010 +Licensed under MIT License + +This module provides a basic interface to OpenSSL's EVP +envelope functions. + +In a nutshell, these functions are designed to provide +the primary benefit of public key cryptography (the +ability to provide secrecy without first sharing a +secret) without its primary downside (small message +length). It does this by generating a random AES key +with which to encrypt the data, then encrypting that +key against the provided RSA key. + +This means that if you have an application in which +you wish to share sensitive data but do not wish to +share a common secret, this is your module. Be aware +that compromising your private key is effectively +game over with this scheme. + +If you require a shared secret and want the key to +be human readable, then you will probably want to +use the cipher module instead. + +All the functions in this module raise EnvelopeError on +malfunction. + +Usage: + + >>> from evpy import envelope + >>> f = open("test/short.txt", "rb") + >>> data = f.read() + >>> public_key = "test/keys/public1.pem" + >>> private_key = "test/keys/private1.pem" + >>> iv, key, ciphertext = envelope.encrypt(data, public_key) + >>> envelope.decrypt(iv, key, ciphertext, private_key) == data + True +""" + +import ctypes + +import evp +from signature import _string_to_bio + +class EnvelopeError(evp.SSLError): + pass + +def _build_dkey_from_file(keyfile): + fp = evp.fopen(keyfile, "r") + if not fp: + raise EnvelopeError("Could not open keyfile") + # get the decryption key + skey = evp.PEM_read_PrivateKey(fp, None, None, None) + if not skey: + evp.fclose(fp) + raise EnvelopeError("Could not read decryption key") + # close the file + evp.fclose(fp) + return skey + +def _build_dkey_from_string(key): + bio = _string_to_bio(key) + dkey = evp.PEM_read_bio_PrivateKey(bio, None, None, None) + if not dkey: + raise EnvelopeError("Could not build decryption key from string") + evp.BIO_free(bio) + return dkey + +def _build_ekey_from_file(keyfile): + fp = evp.fopen(keyfile, "r") + if not fp: + raise EnvelopeError("Could not open keyfile") + # get the encryption key + ekey = evp.PEM_read_PUBKEY(fp, None, None, None) + if not ekey: + evp.fclose(fp) + raise EnvelopeError("Could not read encryption key") + # close the file + evp.fclose(fp) + return ekey + +def _build_ekey_from_string(key): + bio = _string_to_bio(key) + ekey = evp.PEM_read_bio_PUBKEY(bio, None, None, None) + if not ekey: + raise EnvelopeError("Could not create encryption key from string") + evp.BIO_free(bio) + return ekey + +def _build_bio(): + method = evp.BIO_s_mem() + return evp.BIO_new(method); + +def _asn1_hex_to_int(value): + print(value) + return int(''.join(value.split(':')), 16) + +def _parse_printed_key(k): + attrs = {} + current = "" + current_attr = "" + for line in k.splitlines()[1:]: + # its a continuation of the current block + if line.startswith(' '): + current += line.strip() + else: + # special case the public exponent + if "publicExponent" in current_attr: + attrs['publicExponent'] = int(current_attr.split()[1]) + elif current_attr: + attrs[current_attr] = _asn1_hex_to_int(current) + current_attr = line.strip(':') + current = "" + translator = {'publicExponent': 'e', 'privateExponent': 'd', 'modulus': 'n', 'prime1': 'p', 'prime2': 'q'} + translated_attrs = {} + for key, value in attrs.items(): + try: + translated_attrs[translator[key]] = value + except: pass + return translated_attrs + + +def keygen(bitlength=1024, e=65537, pem=True): + key = evp.RSA_generate_key(bitlength, e, None, None) + if not key: + raise EnvelopeError("Could not generate key") + if pem: + private_bio = evp.BIO_new(evp.BIO_s_mem()) + if not private_bio: + raise KeygenError("Could not create temporary storage") + public_bio = evp.BIO_new(evp.BIO_s_mem()) + if not public_bio: + raise KeygenError("Could not create temporary storage") + private_buf = ctypes.create_string_buffer('', 65537) + if not private_buf: + raise MemoryError("Could not allocate key storage") + public_buf = ctypes.create_string_buffer('', 65537) + if not public_buf: + raise MemoryError("Could not allocate key storage") + if not evp.PEM_write_bio_RSAPrivateKey(private_bio, key, None, None, 0, 0, None): + raise KeygenError("Could not write private key") + if not evp.PEM_write_bio_RSA_PUBKEY(public_bio, key): + raise KeygenError("Could not write public key") + public_len = evp.BIO_read(public_bio, public_buf, 65537) + private_len = evp.BIO_read(private_bio, private_buf, 65537) + evp.BIO_free(public_bio) + evp.BIO_free(private_bio) + return public_buf.value, private_buf.value + else: + # we go through this rigamarole because if there's an engine + # in place it won't populate the RSA key's values properly. + key_bio = evp.BIO_new(evp.BIO_s_mem()) + if not key_bio: + raise KeygenError("Could not create temporary storage") + if not evp.RSA_print(key_bio, key, 0): + raise KeygenError("Could not stringify key") + key_buf = ctypes.create_string_buffer('', 65537) + if not key_buf: + raise MemoryError("Could not allocate key storage") + evp.BIO_read(key_bio, key_buf, 65537) + evp.BIO_free(key_bio) + key_string = key_buf.value + return key, _parse_printed_key(key_string) + + + + +def encrypt(data, keyfile=None, key=None): + """Encrypts the given data, raising EnvelopeError on failure. + + This uses AES192 to do bulk encryption and RSA to encrypt + the given public key. + + Usage: + >>> from evpy import envelope + >>> f = open("test/short.txt", "rb") + >>> data = f.read() + >>> public_key = "test/keys/public1.pem" + >>> private_key = "test/keys/private1.pem" + >>> iv, key, ciphertext = envelope.encrypt(data, public_key) + >>> envelope.decrypt(iv, key, ciphertext, private_key) == data + True + """ + # validate the incoming data + if not data: + raise EnvelopeError("Incoming data must be bytes") + if not len(data): + raise EnvelopeError("Data must actually exist") + + # build and initialize the context + ctx = evp.EVP_CIPHER_CTX_new() + if not ctx: + raise EnvelopeError("Could not create context") + evp.EVP_CIPHER_CTX_init(ctx) + + # get the key from the keyfile + if key and not keyfile: + ekey = _build_ekey_from_string(key) + elif keyfile and not key: + ekey = _build_ekey_from_file(keyfile) + else: + raise EnvelopeError("Must specify exactly one key or keyfile") + + # get the cipher object + cipher_object = evp.EVP_aes_192_cbc() + if not cipher_object: + raise EnvelopeError("Could not create cipher object") + + # finish the context and cipher object + if not evp.EVP_EncryptInit_ex(ctx, cipher_object, None, None, None): + raise EnvelopeError("Could not finish context") + + # build the randomized iv + iv_length = evp.EVP_CIPHER_CTX_iv_length(ctx) + iv = ctypes.create_string_buffer(iv_length) + for i in range(1000): + if evp.RAND_bytes(iv, iv_length): break + else: + raise EnvelopeError("Could not generate enough entropy for IV") + output_iv = iv.raw + + # build the randomized AES key + keysize = evp.EVP_CIPHER_key_length(cipher_object) + aes_key = ctypes.create_string_buffer(keysize) + for i in range(1000): + if evp.RAND_bytes(aes_key, keysize): break + else: + raise EnvelopeError("Could not generate enough entropy for AES key") + + # extract the RSA key + rsa_key = evp.EVP_PKEY_get1_RSA(ekey) + if not rsa_key: + raise EnvelopeError("Could not get RSA key") + + # encrypt it + buf_size = evp.RSA_size(rsa_key) + if not buf_size: + raise EnvelopeError("Invalid RSA keysize") + encrypted_aes_key = ctypes.create_string_buffer(buf_size) + # RSA_PKCS1_PADDING is defined as 1 + written = evp.RSA_public_encrypt(keysize, aes_key, encrypted_aes_key, rsa_key, 1) + if not written: + raise EnvelopeError("Could not encrypt AES key") + output_key = encrypted_aes_key.raw[:written] + + # initialize the encryption operation + if not evp.EVP_EncryptInit_ex(ctx, None, None, aes_key, iv): + raise EnvelopeError("Could not start encryption operation") + + # build the output buffer + buf = ctypes.create_string_buffer(len(data) + 16) + written = ctypes.c_int(0) + final = ctypes.c_int(0) + + # update + if not evp.EVP_EncryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)): + raise EnvelopeError("Could not update ciphertext") + output = buf.raw[:written.value] + + # finalize + if not evp.EVP_EncryptFinal_ex(ctx, buf, ctypes.byref(final)): + raise EnvelopeError("Could not finalize ciphertext") + output += buf.raw[:final.value] + + # ...and go home + return output_iv, output_key, output + + +def decrypt(iv, encrypted_aes_key, data, keyfile=None, key=None): + """Decrypts the given ciphertext, raising EnvelopeError on failure. + + Usage: + >>> from evpy import envelope + >>> f = open("test/short.txt", "rb") + >>> data = f.read() + >>> public_key = "test/keys/public1.pem" + >>> private_key = "test/keys/private1.pem" + >>> iv, key, ciphertext = envelope.encrypt(data, public_key) + >>> envelope.decrypt(iv, key, ciphertext, private_key) == data + True + """ + # build and initialize the context + ctx = evp.EVP_CIPHER_CTX_new() + if not ctx: + raise EnvelopeError("Could not create context") + evp.EVP_CIPHER_CTX_init(ctx) + + # get the cipher object + cipher_object = evp.EVP_aes_192_cbc() + if not cipher_object: + raise EnvelopeError("Could not create cipher object") + + # get the key from the keyfile + if key and not keyfile: + dkey = _build_dkey_from_string(key) + elif keyfile and not key: + dkey = _build_dkey_from_file(keyfile) + else: + raise EnvelopeError("Must specify exactly one key or keyfile") + + # open the envelope + if not evp.EVP_OpenInit(ctx, cipher_object, encrypted_aes_key, len(encrypted_aes_key), iv, dkey): + raise EnvelopeError("Could not open envelope") + + # build the output buffer + buf = ctypes.create_string_buffer(len(data) + 16) + written = ctypes.c_int(0) + final = ctypes.c_int(0) + + # update + if not evp.EVP_DecryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)): + raise EnvelopeError("Could not update envelope") + output = buf.raw[:written.value] + + # finalize + if not evp.EVP_DecryptFinal_ex(ctx, buf, ctypes.byref(final)): + raise EnvelopeError("Could not finalize envelope") + output += buf.raw[:final.value] + + return output diff --git a/src/evpy/evp.py b/src/evpy/evp.py new file mode 100755 index 00000000..55988767 --- /dev/null +++ b/src/evpy/evp.py @@ -0,0 +1,318 @@ +#! /usr/bin/env python + +import ctypes +import ctypes.util +import platform +from os import linesep + +def handle_errors(): + ERR_load_crypto_strings() + errno = ERR_get_error() + errbuf = ctypes.create_string_buffer(1024) + ERR_error_string_n(errno, errbuf, 1024) + return errbuf.value.decode("ascii") + +class SSLError(Exception): + def __init__(self, msg): + sslerr = handle_errors() + Exception.__init__(self, msg + linesep + sslerr) + +libraries = {} + +libraries["c"] = ctypes.CDLL(ctypes.util.find_library("c")) + +if platform.system() != "Windows": + libraries["ssl"] = ctypes.CDLL(ctypes.util.find_library("ssl")) +else: + libraries["ssl"] = ctypes.CDLL(ctypes.util.find_library("libeay32")) + + +class RSA(ctypes.Structure): + _fields_ = [ ("n", ctypes.c_void_p), + ("e", ctypes.c_void_p), + ("d", ctypes.c_void_p), + ("p", ctypes.c_void_p), + ("q", ctypes.c_void_p), + ("dmp1", ctypes.c_void_p), + ("iqmp", ctypes.c_void_p)] + +def handle_errors(): + ERR_load_crypto_strings() + errno = ERR_get_error() + errbuf = ctypes.create_string_buffer(1024) + ERR_error_string_n(errno, errbuf, 1024) + return errbuf.value.decode("ascii") + + +fopen = libraries['c'].fopen +fopen.restype = ctypes.c_void_p +fopen.argtypes = [ctypes.c_char_p, ctypes.c_char_p] + + +fclose = libraries['c'].fclose +fclose.restype = ctypes.c_int +fclose.argtypes = [ctypes.c_void_p] + + +ERR_load_crypto_strings = libraries['ssl'].ERR_load_crypto_strings +ERR_load_crypto_strings.restype = ctypes.c_int +ERR_load_crypto_strings.argtypes = [] + + +ERR_print_errors_fp = libraries['ssl'].ERR_print_errors_fp +ERR_print_errors_fp.restype = ctypes.c_int +ERR_print_errors_fp.argtypes = [ctypes.c_void_p] + + +OpenSSL_add_all_digests = libraries['ssl'].OpenSSL_add_all_digests +OpenSSL_add_all_digests.restype = ctypes.c_int +OpenSSL_add_all_digests.argtypes = [] + + +EVP_MD_CTX_create = libraries['ssl'].EVP_MD_CTX_create +EVP_MD_CTX_create.restype = ctypes.c_void_p +EVP_MD_CTX_create.argtypes = [] + + +PEM_read_PrivateKey = libraries['ssl'].PEM_read_PrivateKey +PEM_read_PrivateKey.restype = ctypes.c_void_p +PEM_read_PrivateKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + + +PEM_read_X509 = libraries['ssl'].PEM_read_X509 +PEM_read_X509.restype = ctypes.c_void_p +PEM_read_X509.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + + +PEM_read_PUBKEY = libraries['ssl'].PEM_read_PUBKEY +PEM_read_PUBKEY.restype = ctypes.c_void_p +PEM_read_PUBKEY.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + + +PEM_read_bio_PUBKEY = libraries['ssl'].PEM_read_bio_PUBKEY +PEM_read_bio_PUBKEY.restype = ctypes.c_void_p +PEM_read_bio_PUBKEY.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + + +BIO_free = libraries['ssl'].BIO_free +BIO_free.restype = ctypes.c_int +BIO_free.argtypes = [ctypes.c_void_p] + + +PEM_read_bio_PrivateKey = libraries['ssl'].PEM_read_bio_PrivateKey +PEM_read_bio_PrivateKey.restype = ctypes.c_void_p +PEM_read_bio_PrivateKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + + +EVP_get_digestbyname = libraries['ssl'].EVP_get_digestbyname +EVP_get_digestbyname.restype = ctypes.c_void_p +EVP_get_digestbyname.argtypes = [ctypes.c_char_p] + + +EVP_DigestInit = libraries['ssl'].EVP_DigestInit +EVP_DigestInit.restype = ctypes.c_int +EVP_DigestInit.argtypes = [ctypes.c_void_p, ctypes.c_void_p] + + +EVP_DigestUpdate = libraries['ssl'].EVP_DigestUpdate +EVP_DigestUpdate.restype = ctypes.c_int +EVP_DigestUpdate.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int] + + +EVP_SignFinal = libraries['ssl'].EVP_SignFinal +EVP_SignFinal.restype = ctypes.c_int +EVP_SignFinal.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_void_p] + + +EVP_VerifyFinal = libraries['ssl'].EVP_VerifyFinal +EVP_VerifyFinal.restype = ctypes.c_int +EVP_VerifyFinal.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p] + + +EVP_PKEY_free = libraries['ssl'].EVP_PKEY_free +EVP_PKEY_free.restype = ctypes.c_int +EVP_PKEY_free.argtypes = [ctypes.c_void_p] + + +EVP_MD_CTX_cleanup = libraries['ssl'].EVP_MD_CTX_cleanup +EVP_MD_CTX_cleanup.restype = ctypes.c_int +EVP_MD_CTX_cleanup.argtypes = [ctypes.c_void_p] + + +EVP_MD_CTX_destroy = libraries['ssl'].EVP_MD_CTX_destroy +EVP_MD_CTX_destroy.restype = ctypes.c_int +EVP_MD_CTX_destroy.argtypes = [ctypes.c_void_p] + + +EVP_CIPHER_CTX_new = libraries['ssl'].EVP_CIPHER_CTX_new +EVP_CIPHER_CTX_new.restype = ctypes.c_void_p +EVP_CIPHER_CTX_new.argtypes = [] + + +EVP_CIPHER_CTX_init = libraries['ssl'].EVP_CIPHER_CTX_init +EVP_CIPHER_CTX_init.restype = ctypes.c_int +EVP_CIPHER_CTX_init.argtypes = [ctypes.c_void_p] + +try: + EVP_CIPHER_CTX_iv_length = libraries['ssl'].EVP_CIPHER_CTX_iv_length + EVP_CIPHER_CTX_iv_length.restype = ctypes.c_int + EVP_CIPHER_CTX_iv_length.argtypes = [ctypes.c_void_p] +except: + EVP_CIPHER_CTX_iv_length = lambda(x): 16 + + +EVP_aes_192_cbc = libraries['ssl'].EVP_aes_192_cbc +EVP_aes_192_cbc.restype = ctypes.c_void_p +EVP_aes_192_cbc.argtypes = [] + + +EVP_PKEY_size = libraries['ssl'].EVP_PKEY_size +EVP_PKEY_size.restype = ctypes.c_int +EVP_PKEY_size.argtypes = [ctypes.c_void_p] + + +EVP_SealInit = libraries['ssl'].EVP_SealInit +EVP_SealInit.restype = ctypes.c_int +EVP_SealInit.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int] + + +EVP_EncryptInit_ex = libraries['ssl'].EVP_EncryptInit_ex +EVP_EncryptInit_ex.restype = ctypes.c_int +EVP_EncryptInit_ex.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p] + + +EVP_EncryptUpdate = libraries['ssl'].EVP_EncryptUpdate +EVP_EncryptUpdate.restype = ctypes.c_int +EVP_EncryptUpdate.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_char_p, ctypes.c_int] + + +EVP_DecryptUpdate = libraries['ssl'].EVP_DecryptUpdate +EVP_DecryptUpdate.restype = ctypes.c_int +EVP_DecryptUpdate.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_char_p, ctypes.c_int] + + +EVP_EncryptFinal_ex = libraries['ssl'].EVP_EncryptFinal_ex +EVP_EncryptFinal_ex.restype = ctypes.c_int +EVP_EncryptFinal_ex.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int)] + + +EVP_SealFinal = libraries['ssl'].EVP_SealFinal +EVP_SealFinal.restype = ctypes.c_int +EVP_SealFinal.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int)] + + +EVP_DecryptFinal_ex = libraries['ssl'].EVP_DecryptFinal_ex +EVP_DecryptFinal_ex.restype = ctypes.c_int +EVP_DecryptFinal_ex.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int)] + + +RAND_bytes = libraries['ssl'].RAND_bytes +RAND_bytes.restype = ctypes.c_int +RAND_bytes.argtypes = [ctypes.c_char_p, ctypes.c_int] + + +BIO_new_mem_buf = libraries['ssl'].BIO_new_mem_buf +BIO_new_mem_buf.restype = ctypes.c_void_p +BIO_new_mem_buf.argtypes = [ctypes.c_char_p, ctypes.c_int] + + +EVP_CIPHER_CTX_rand_key = libraries['ssl'].EVP_CIPHER_CTX_rand_key +EVP_CIPHER_CTX_rand_key.restype = ctypes.c_int +EVP_CIPHER_CTX_rand_key.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + + +try: + EVP_CIPHER_key_length = libraries['ssl'].EVP_CIPHER_key_length + EVP_CIPHER_key_length.restype = ctypes.c_int + EVP_CIPHER_key_length.argtypes = [ctypes.c_void_p] +except: + EVP_CIPHER_key_length = lambda(x): 24 + + +EVP_PKEY_encrypt = libraries['ssl'].EVP_PKEY_encrypt +EVP_PKEY_encrypt.restype = ctypes.c_int +EVP_PKEY_encrypt.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p] + + +EVP_OpenInit = libraries['ssl'].EVP_OpenInit +EVP_OpenInit.restype = ctypes.c_int +EVP_OpenInit.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_void_p] + + +EVP_PKEY_size = libraries['ssl'].EVP_PKEY_size +EVP_PKEY_size.restype = ctypes.c_int +EVP_PKEY_size.argtypes = [ctypes.c_void_p] + + +RSA_public_encrypt = libraries['ssl'].RSA_public_encrypt +RSA_public_encrypt.restype = ctypes.c_int +RSA_public_encrypt.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int] + + +EVP_PKEY_get1_RSA = libraries['ssl'].EVP_PKEY_get1_RSA +EVP_PKEY_get1_RSA.restype = ctypes.c_void_p +EVP_PKEY_get1_RSA.argtypes = [ctypes.c_void_p] + + +EVP_DecryptInit_ex = libraries['ssl'].EVP_DecryptInit_ex +EVP_DecryptInit_ex.restype = ctypes.c_int +EVP_DecryptInit_ex.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p] + + +EVP_CIPHER_CTX_set_key_length = libraries['ssl'].EVP_CIPHER_CTX_set_key_length +EVP_CIPHER_CTX_set_key_length.restype = ctypes.c_int +EVP_CIPHER_CTX_set_key_length.argtypes = [ctypes.c_void_p, ctypes.c_int] + + +EVP_BytesToKey = libraries['ssl'].EVP_BytesToKey +EVP_BytesToKey.restype = ctypes.c_int +EVP_BytesToKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p] + + +ERR_get_error = libraries['ssl'].ERR_get_error +ERR_get_error.restype = ctypes.c_long +ERR_get_error.argtypes = [] + + +ERR_error_string_n = libraries['ssl'].ERR_error_string_n +ERR_error_string_n.restype = ctypes.c_void_p +ERR_error_string_n.argtypes = [ctypes.c_long, ctypes.c_char_p, ctypes.c_int] + +RSA_size = libraries['ssl'].RSA_size +RSA_size.restype = ctypes.c_int +RSA_size.argtypes = [ctypes.c_void_p] + +RSA_new = libraries['ssl'].RSA_new +RSA_new.restype = ctypes.POINTER(RSA) + +RSA_generate_key = libraries['ssl'].RSA_generate_key +RSA_generate_key.restype = ctypes.POINTER(RSA) +RSA_generate_key.argtypes = [ctypes.c_int, ctypes.c_ulong, ctypes.c_void_p, ctypes.c_void_p] + +RSA_print = libraries['ssl'].RSA_print +RSA_print.restype = ctypes.c_int +RSA_print.argtypes = [ctypes.c_void_p, ctypes.POINTER(RSA), ctypes.c_int] + +PEM_write_bio_RSA_PUBKEY = libraries['ssl'].PEM_write_bio_RSA_PUBKEY + +PEM_write_bio_RSAPrivateKey = libraries['ssl'].PEM_write_bio_RSAPrivateKey +PEM_write_bio_RSAPrivateKey.restype = ctypes.c_int +PEM_write_bio_RSAPrivateKey.argtypes = [ctypes.c_void_p, ctypes.POINTER(RSA), ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p] + +PEM_write_RSAPrivateKey = libraries['ssl'].PEM_write_RSAPrivateKey +PEM_write_RSAPrivateKey.restype = ctypes.c_int +PEM_write_RSAPrivateKey.argtypes = [ctypes.c_void_p, ctypes.POINTER(RSA), ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p] + +i2d_RSAPrivateKey = libraries['ssl'].i2d_RSAPrivateKey + +BIO_read = libraries['ssl'].BIO_read + +BIO_s_mem = libraries['ssl'].BIO_s_mem +BIO_s_mem.restype = ctypes.c_void_p + +BIO_new = libraries['ssl'].BIO_new +BIO_new.restype = ctypes.c_void_p +BIO_new.argtypes = [ctypes.c_void_p] + +RAND_seed = libraries['ssl'].RAND_seed +RAND_seed.argtypes = [ctypes.c_void_p, ctypes.c_int] diff --git a/src/evpy/signature.py b/src/evpy/signature.py new file mode 100755 index 00000000..29a6beac --- /dev/null +++ b/src/evpy/signature.py @@ -0,0 +1,215 @@ +#! /usr/bin/env python + +""" +signature.py + +Written by Geremy Condra +Released on 18 March 2010 +Licensed under MIT License + +This module provides a basic interface to OpenSSL's EVP +signature functions. + +All functions in this module will raise a SignatureError +in the event of a malfunction. + +The goal of cryptographic signatures is to provide some +degree of assurance that the data you are processing is +both coming from the person you think is sending it and +is what they sent. + +Note that this does not encrypt data in the sense that +it does not provide secrecy for it, while evpy.cipher +and evpy.envelope provide secrecy but no other security +properties. + +Usage: + >>> from evpy import signature + >>> data = b"abcdefg" + >>> public_key = "test/keys/public1.pem" + >>> private_key = "test/keys/private1.pem" + >>> s = signature.sign(data, private_key) + >>> signature.verify(data, s, public_key) + True +""" + +import ctypes + +import evp + + +class SignatureError(evp.SSLError): + pass + + +def sign(data, keyfile=None, key=None): + """Signs the given data, raising SignatureError on failure. + + Exactly one of keyfile, key should be given; if key is not + defined, then the key will be read from the given file. + + Usage: + >>> from evpy import signature + >>> f = open("test/short.txt", "rb") + >>> data = f.read() + >>> public_key = "test/keys/public1.pem" + >>> private_key = "test/keys/private1.pem" + >>> s = signature.sign(data, private_key) + >>> signature.verify(data, s, public_key) + True + """ + # add the digests + evp.OpenSSL_add_all_digests() + + # build the context + ctx = evp.EVP_MD_CTX_create() + if not ctx: + raise SignatureError("Could not create context") + + # get the signing key + if key and not keyfile: + skey = _build_skey_from_string(key) + elif keyfile and not key: + skey = _build_skey_from_file(keyfile) + else: + raise SignatureError("Exactly one of key, keyfile must be specified") + + # build the hash object + evp_hash = _build_hash() + if not evp.EVP_DigestInit(ctx, evp_hash): + _cleanup(skey, ctx) + raise SignatureError("Could not initialize signature") + + # update + if not evp.EVP_DigestUpdate(ctx, data, len(data)): + _cleanup(skey, ctx) + raise SignatureError("Could not update signature") + + # finalize + output_buflen = ctypes.c_int(evp.EVP_PKEY_size(skey)) + output = ctypes.create_string_buffer(output_buflen.value) + if not evp.EVP_SignFinal(ctx, output, ctypes.byref(output_buflen), skey): + _cleanup(skey, ctx) + raise SignatureError("Could not finalize signature") + + # cleanup + _cleanup(skey, ctx) + + # and go home + return ctypes.string_at(output, output_buflen) + + +def verify(data, sig, keyfile=None, key=None): + """Verifies the given signature, returning a boolean. + + Exactly one of keyfile, key should be specified. + + This function raises SignatureError on error. + + Usage: + >>> from evpy import signature + >>> f = open("test/short.txt", "rb") + >>> data = f.read() + >>> public_key = "test/keys/public1.pem" + >>> private_key = "test/keys/private1.pem" + >>> s = signature.sign(data, private_key) + >>> signature.verify(data, s, public_key) + True + """ + # add the digests + evp.OpenSSL_add_all_digests() + + # build the context + ctx = evp.EVP_MD_CTX_create() + if not ctx: + raise SignatureError("Could not create context") + + # get the vkey + if key and not keyfile: + vkey = _build_vkey_from_string(key) + elif keyfile and not key: + vkey = _build_vkey_from_file(keyfile) + else: + raise SignatureError("Exactly one of key, keyfile must be specified") + + # build the hash object + evp_hash = _build_hash() + if not evp.EVP_DigestInit(ctx, evp_hash): + _cleanup(vkey, ctx) + raise SignatureError("Could not initialize verifier") + + # update + if not evp.EVP_DigestUpdate(ctx, data, len(data)): + _cleanup(vkey, ctx) + raise SignatureError("Could not update verifier") + + # finalize + retcode = evp.EVP_VerifyFinal(ctx, sig, len(sig), vkey) + + # cleanup + _cleanup(vkey, ctx) + + # and go home + if retcode == 1: + return True + elif retcode == 0: + return False + else: + raise SignatureError("Error verifying signature") + +def _cleanup(key, ctx): + evp.EVP_PKEY_free(key) + evp.EVP_MD_CTX_cleanup(ctx) + evp.EVP_MD_CTX_destroy(ctx) + +def _string_to_bio(s): + return evp.BIO_new_mem_buf(s, len(s)) + +def _build_skey_from_file(keyfile): + fp = evp.fopen(keyfile, "r") + if not fp: + raise SignatureError("Could not open keyfile") + # get the signing key + skey = evp.PEM_read_PrivateKey(fp, None, None, None) + if not skey: + evp.fclose(fp) + raise SignatureError("Could not read signing key") + # close the file + evp.fclose(fp) + return skey + +def _build_skey_from_string(key): + buf = ctypes.create_string_buffer(key) + bio = evp.BIO_new_mem_buf(buf, len(buf.value)) + skey = evp.PEM_read_bio_PrivateKey(bio, None, None, None, None) + if not skey: + raise SignatureError("Could not construct signing key from the given string") + evp.BIO_free(bio) + return skey + +def _build_vkey_from_file(keyfile): + fp = evp.fopen(keyfile, "r") + if not fp: + raise SignatureError("Could not open keyfile") + # get the verification key + vkey = evp.PEM_read_PUBKEY(fp, None, None, None) + if not vkey: + evp.fclose(fp) + raise SignatureError("Could not read verification key") + # close the file + evp.fclose(fp) + return vkey + +def _build_vkey_from_string(key): + buf = ctypes.create_string_buffer(key) + bio = evp.BIO_new_mem_buf(buf, len(buf.value)) + vkey = evp.PEM_read_bio_PUBKEY(bio, None, None, None) + if not vkey: + raise SignatureError("Could not construct verification key from the given string") + return vkey + +def _build_hash(): + evp_hash = evp.EVP_get_digestbyname("sha512") + if not evp_hash: + raise SignatureError("Could not create hash object") + return evp_hash diff --git a/src/evpy/test.py b/src/evpy/test.py new file mode 100755 index 00000000..f9d39f1e --- /dev/null +++ b/src/evpy/test.py @@ -0,0 +1,668 @@ +#! /usr/bin/env python + +""" +test.py + +Written by Geremy Condra +Licensed under MIT License +Released 21 March 2010 + +Simple unit tests for evpy +""" + +import unittest + +from evpy import evp +from evpy import cipher +from evpy import signature +from evpy import envelope + +# locations for test data +TEST_DATA_DIR = "test/" +TEST_KEYS = TEST_DATA_DIR + "keys/" + +# test text +STRING = open(TEST_DATA_DIR + "long.txt").read() +LONG = open(TEST_DATA_DIR + "long.txt", 'rb').read() +SHORT = open(TEST_DATA_DIR + "short.txt", 'rb').read() +UNICODE = open(TEST_DATA_DIR + "unicode.txt", 'rb').read() +NULL = open(TEST_DATA_DIR + "null.txt", 'rb').read() +TEST_TEXTS = { "long": LONG, + "short": SHORT, + "unicode": UNICODE, + "null": NULL} + +# test keys +SHORT_SYMMETRIC = open(TEST_KEYS + "short_symmetric.txt", 'rb').read() +LONG_SYMMETRIC = open(TEST_KEYS + "long_symmetric.txt", 'rb').read() +SYMMETRIC_KEYS = [LONG_SYMMETRIC, SHORT_SYMMETRIC] +SYMMETRIC_STRING = open(TEST_KEYS + "short_symmetric.txt").read() + +KEY_1 = TEST_KEYS + "private1.pem", TEST_KEYS + "public1.pem" +KEY_2 = TEST_KEYS + "private2.pem", TEST_KEYS + "public2.pem" +MISMATCH_1 = KEY_1[0], KEY_2[1] +MISMATCH_2 = KEY_2[0], KEY_1[1] +MISSING_PRIVATE_KEY = "notakey.pem", KEY_1[1] +MISSING_PUBLIC_KEY = KEY_1[0], "notakey.pem" +BLANK_PRIVATE_KEY = KEY_1[0], TEST_KEYS + "blank.pem" +BLANK_PUBLIC_KEY = TEST_KEYS + "blank.pem", KEY_1[1] + +def run_n_times(f, g, n): + def err(*args, **kwargs): + if err.n > 0: + err.n -= 1 + return f(*args, **kwargs) + else: + return g(*args, **kwargs) + err.n = n + return err + +class TestCipher(unittest.TestCase): + + def round_trip(self, key, text): + salt, iv, enc = cipher.encrypt(text, key) + output = cipher.decrypt(salt, iv, enc, key) + self.assertEqual(output, text, "Failed to round trip") + + def test_round_trip_short_zero(self): + self.assertRaises(cipher.CipherError, self.round_trip, SHORT_SYMMETRIC, '') + + def test_round_trip_short_long(self): + self.round_trip(SHORT_SYMMETRIC, LONG) + + def test_round_trip_short_short(self): + self.round_trip(SHORT_SYMMETRIC, SHORT) + + def test_round_trip_short_unicode(self): + self.round_trip(SHORT_SYMMETRIC, UNICODE) + + def test_round_trip_short_null(self): + self.round_trip(SHORT_SYMMETRIC, NULL) + + def test_round_trip_long_zero(self): + self.assertRaises(cipher.CipherError, self.round_trip, LONG_SYMMETRIC, '') + + def test_round_trip_long_long(self): + self.round_trip(LONG_SYMMETRIC, LONG) + + def test_round_trip_long_short(self): + self.round_trip(LONG_SYMMETRIC, SHORT) + + def test_round_trip_long_unicode(self): + self.round_trip(LONG_SYMMETRIC, UNICODE) + + def test_round_trip_long_null(self): + self.round_trip(LONG_SYMMETRIC, NULL) + + def test_no_data(self): + iv, salt, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, '', SHORT_SYMMETRIC) + + def test_short_salt(self): + salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, salt[:-1], iv, enc, SHORT_SYMMETRIC) + + def test_long_salt(self): + salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, salt+salt[:-1], iv, enc, SHORT_SYMMETRIC) + + def test_short_iv(self): + salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv[:-1], enc, SHORT_SYMMETRIC) + + def test_long_iv(self): + salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv+iv[:-1], enc, SHORT_SYMMETRIC) + + def test_round_trip_no_password(self): + iv, salt, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.encrypt, UNICODE, '') + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, '') + + def test_round_trip_failure(self): + salt, iv, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv, enc, LONG_SYMMETRIC) + salt, iv, enc = cipher.encrypt(NULL, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv, enc, LONG_SYMMETRIC) + + def test_bad_rand_bytes(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + rand_bytes = cipher.evp.RAND_bytes + cipher.evp.RAND_bytes = lambda a,b: 0 + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.RAND_bytes = rand_bytes + + def test_bad_rand_bytes_1(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + rand_bytes = cipher.evp.RAND_bytes + cipher.evp.RAND_bytes = lambda a,b: 0 + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.RAND_bytes = rand_bytes + + def test_bad_rand_bytes_2(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + rand_bytes = cipher.evp.RAND_bytes + cipher.evp.RAND_bytes = run_n_times(cipher.evp.RAND_bytes, lambda a,b:0, 1) + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.RAND_bytes = rand_bytes + + def test_bad_hash_by_name(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + hash_by_name = cipher.evp.EVP_get_digestbyname + cipher.evp.EVP_get_digestbyname = lambda a: None + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.EVP_get_digestbyname = hash_by_name + + def test_bad_bytes_to_key(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + bytes_to_key = cipher.evp.EVP_BytesToKey + cipher.evp.EVP_BytesToKey = lambda a,b,c,d,e,f,g,h: 0 + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC) + cipher.evp.EVP_BytesToKey = bytes_to_key + + def test_bad_ctx_new(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + new_ctx = cipher.evp.EVP_CIPHER_CTX_new + cipher.evp.EVP_CIPHER_CTX_new = lambda: None + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC) + cipher.evp.EVP_CIPHER_CTX_new = new_ctx + + def test_bad_cipher_object(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + cipher_getter = cipher.evp.EVP_aes_192_cbc + cipher.evp.EVP_aes_192_cbc= lambda: None + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC) + cipher.evp.EVP_aes_192_cbc = cipher_getter + + def test_bad_encrypt_init_1(self): + encrypt_init = cipher.evp.EVP_EncryptInit_ex + cipher.evp.EVP_EncryptInit_ex = lambda a,b,c,d,e: None + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.EVP_EncryptInit_ex = encrypt_init + + def test_bad_encrypt_init_2(self): + encrypt_init = cipher.evp.EVP_EncryptInit_ex + cipher.evp.EVP_EncryptInit_ex = run_n_times(cipher.evp.EVP_EncryptInit_ex, lambda a,b,c,d,e: None, 1) + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.EVP_EncryptInit_ex = encrypt_init + + def test_bad_encrypt_update(self): + encrypt_update = cipher.evp.EVP_EncryptUpdate + cipher.evp.EVP_EncryptUpdate = lambda a,b,c,d,e: None + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.EVP_EncryptUpdate = encrypt_update + + def test_bad_encrypt_final(self): + encrypt_final = cipher.evp.EVP_EncryptFinal_ex + cipher.evp.EVP_EncryptFinal_ex = lambda a,b,c: None + self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC) + cipher.evp.EVP_EncryptFinal_ex = encrypt_final + + def test_bad_decrypt_init(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + decrypt_init = cipher.evp.EVP_DecryptInit_ex + cipher.evp.EVP_DecryptInit_ex = lambda a,b,c,d,e: None + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC) + cipher.evp.EVP_DecryptInit_ex = decrypt_init + + def test_bad_decrypt_update(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + decrypt_update = cipher.evp.EVP_DecryptUpdate + cipher.evp.EVP_DecryptUpdate = lambda a,b,c,d,e: None + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC) + cipher.evp.EVP_DecryptUpdate = decrypt_update + + def test_bad_decrypt_final(self): + iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC) + decrypt_final = cipher.evp.EVP_DecryptFinal_ex + cipher.evp.EVP_DecryptFinal_ex = lambda a,b,c: None + self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC) + cipher.evp.EVP_DecryptFinal_ex = decrypt_final + + +class TestSignature(unittest.TestCase): + + def round_trip(self, keys, text): + s = signature.sign(text, keys[0]) + return signature.verify(text, s, keys[1]) + + def round_trip_strings(self, keys, text): + s = signature.sign(text, key=open(keys[0], 'rb').read()) + v = signature.verify(text, s, key=open(keys[1], 'rb').read()) + return v + + def round_trip_all_keys(self, text): + self.assertTrue(self.round_trip(KEY_1, text)) + self.assertTrue(self.round_trip(KEY_2, text)) + self.assertFalse(self.round_trip(MISMATCH_1, text)) + self.assertFalse(self.round_trip(MISMATCH_2, text)) + self.assertTrue(self.round_trip_strings(KEY_1, text)) + self.assertTrue(self.round_trip_strings(KEY_2, text)) + self.assertFalse(self.round_trip_strings(MISMATCH_1, text)) + self.assertFalse(self.round_trip_strings(MISMATCH_2, text)) + + def test_round_trip_long(self): + self.round_trip_all_keys(LONG) + + def test_round_trip_short(self): + self.round_trip_all_keys(SHORT) + + def test_round_trip_unicode(self): + self.round_trip_all_keys(UNICODE) + + def test_round_trip_null(self): + self.round_trip_all_keys(NULL) + + def test_round_trip_zero(self): + self.round_trip_all_keys('') + + def test_arguments(self): + text = SHORT + keys = KEY_1 + self.failUnlessRaises(signature.SignatureError, signature.sign, text, key=open(keys[0], 'rb').read(), keyfile=keys[0]) + self.failUnlessRaises(signature.SignatureError, signature.sign, text) + s = signature.sign(text, keyfile=keys[0]) + self.failUnlessRaises(signature.SignatureError, signature.verify, text, s, key=open(keys[1], 'rb').read(), keyfile=keys[1]) + self.failUnlessRaises(signature.SignatureError, signature.verify, text, s) + + def test_bad_ctx(self): + s = signature.sign(SHORT, KEY_1[0]) + new_ctx = signature.evp.EVP_MD_CTX_create + signature.evp.EVP_MD_CTX_create = lambda: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0]) + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1]) + signature.evp.EVP_MD_CTX_create = new_ctx + + def test_bad_hash(self): + s = signature.sign(SHORT, KEY_1[0]) + new_hash = signature.evp.EVP_get_digestbyname + signature.evp.EVP_get_digestbyname = lambda a: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0]) + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1]) + signature.evp.EVP_get_digestbyname = new_hash + + def test_bad_digest_init(self): + s = signature.sign(SHORT, KEY_1[0]) + init = signature.evp.EVP_DigestInit + signature.evp.EVP_DigestInit = lambda a,b: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0]) + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1]) + signature.evp.EVP_DigestInit = init + + def test_bad_digest_update(self): + s = signature.sign(SHORT, KEY_1[0]) + update = signature.evp.EVP_DigestUpdate + signature.evp.EVP_DigestUpdate = lambda a,b,c: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0]) + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1]) + signature.evp.EVP_DigestUpdate = update + + def test_bad_sign_final(self): + final = signature.evp.EVP_SignFinal + signature.evp.EVP_SignFinal = lambda a,b,c,d: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0]) + signature.evp.EVP_SignFinal = final + + def test_bad_verify_final(self): + s = signature.sign(SHORT, KEY_1[0]) + final = signature.evp.EVP_VerifyFinal + signature.evp.EVP_VerifyFinal = lambda a,b,c,d: None + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1]) + signature.evp.EVP_VerifyFinal = final + + def test_bad_read_privatekey(self): + read_private_key = signature.evp.PEM_read_PrivateKey + signature.evp.PEM_read_PrivateKey = lambda a,b,c,d: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0]) + signature.evp.PEM_read_PrivateKey = read_private_key + + def test_bad_read_publickey(self): + s = signature.sign(SHORT, KEY_1[0]) + read_public_key = signature.evp.PEM_read_PUBKEY + signature.evp.PEM_read_PUBKEY = lambda a,b,c,d: None + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1]) + signature.evp.PEM_read_PUBKEY = read_public_key + + def test_bad_read_bio_privatekey(self): + read_private_key = signature.evp.PEM_read_bio_PrivateKey + signature.evp.PEM_read_bio_PrivateKey = lambda a,b,c,d,e: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, key=open(KEY_1[0], 'rb').read()) + signature.evp.PEM_read_bio_PrivateKey = read_private_key + + def test_bad_read_bio_publickey(self): + s = signature.sign(SHORT, KEY_1[0]) + read_public_key = signature.evp.PEM_read_bio_PUBKEY + signature.evp.PEM_read_bio_PUBKEY = lambda a,b,c,d: None + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, key=open(KEY_1[1], 'rb').read()) + signature.evp.PEM_read_bio_PUBKEY = read_public_key + + def test_bad_fopen(self): + s = signature.sign(SHORT, KEY_1[0]) + file_open = signature.evp.fopen + signature.evp.fopen = lambda a,b: None + self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[1]) + self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[0]) + signature.evp.fopen = file_open + + +class TestEnvelope(unittest.TestCase): + + def round_trip(self, keys, text): + iv, sym_key, enc = envelope.encrypt(text, keys[1]) + return envelope.decrypt(iv, sym_key, enc, keys[0]) + + def round_trip_strings(self, keys, text): + iv, sym_key, enc = envelope.encrypt(text, key=open(keys[1], 'rb').read()) + return envelope.decrypt(iv, sym_key, enc, key=open(keys[0], 'rb').read()) + + def test_round_trip_zero(self): + self.assertRaises(envelope.EnvelopeError, self.round_trip, KEY_1, '') + self.assertRaises(envelope.EnvelopeError, self.round_trip, KEY_2, '') + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, '') + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, '') + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, KEY_1, '') + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, KEY_2, '') + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, '') + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, '') + + def test_round_trip_long(self): + self.assertEqual(self.round_trip_strings(KEY_1, LONG), LONG, "Failed to round trip") + self.assertEqual(self.round_trip_strings(KEY_2, LONG), LONG, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, LONG) + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, LONG) + self.assertEqual(self.round_trip(KEY_1, LONG), LONG, "Failed to round trip") + self.assertEqual(self.round_trip(KEY_2, LONG), LONG, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, LONG) + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, LONG) + + def test_round_trip_short(self): + self.assertEqual(self.round_trip(KEY_1, SHORT), SHORT, "Failed to round trip") + self.assertEqual(self.round_trip(KEY_2, SHORT), SHORT, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, SHORT) + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, SHORT) + self.assertEqual(self.round_trip_strings(KEY_1, SHORT), SHORT, "Failed to round trip") + self.assertEqual(self.round_trip_strings(KEY_2, SHORT), SHORT, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, SHORT) + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, SHORT) + + def test_round_trip_unicode(self): + self.assertEqual(self.round_trip(KEY_1, UNICODE), UNICODE, "Failed to round trip") + self.assertEqual(self.round_trip(KEY_2, UNICODE), UNICODE, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, UNICODE) + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, UNICODE) + self.assertEqual(self.round_trip_strings(KEY_1, UNICODE), UNICODE, "Failed to round trip") + self.assertEqual(self.round_trip_strings(KEY_2, UNICODE), UNICODE, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, UNICODE) + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, UNICODE) + + def test_round_trip_null(self): + self.assertEqual(self.round_trip(KEY_1, NULL), NULL, "Failed to round trip") + self.assertEqual(self.round_trip(KEY_2, NULL), NULL, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, NULL) + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, NULL) + self.assertEqual(self.round_trip_strings(KEY_1, NULL), NULL, "Failed to round trip") + self.assertEqual(self.round_trip_strings(KEY_2, NULL), NULL, "Failed to round trip") + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, NULL) + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, NULL) + + def test_bad_keys(self): + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISSING_PUBLIC_KEY, SHORT) + self.assertRaises(envelope.EnvelopeError, self.round_trip, MISSING_PRIVATE_KEY, SHORT) + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, BLANK_PUBLIC_KEY, SHORT) + self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, BLANK_PRIVATE_KEY, SHORT) + + def test_bad_call(self): + # neither key nor keyfile + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT) + # both + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[0], open(KEY_1[0], 'rb').read()) + # string key instead of bytes + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, key=open(KEY_1[0], 'r').read()) + # string data instead of bytes + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, str(SHORT), KEY_1[0], open(KEY_1[0], 'rb').read()) + # get valid encryption data + iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1]) + # neither key nor keyfile + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc) + # both + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[1], key=open(KEY_1[1], 'rb').read()) + # string key instead of bytes + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, key=open(KEY_1[1], 'r').read()) + # string data instead of bytes + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, str(enc), KEY_1[1], key=open(KEY_1[1], 'rb').read()) + # string iv instead of bytes + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, str(iv), aes_key, enc, KEY_1[1], key=open(KEY_1[1], 'rb').read()) + # string key instead of bytes + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, str(aes_key), enc, KEY_1[1], key=open(KEY_1[1], 'rb').read()) + + def test_bad_rand_bytes_1(self): + rand_bytes = envelope.evp.RAND_bytes + envelope.evp.RAND_bytes = lambda a,b:0 + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.RAND_bytes = rand_bytes + + def test_bad_rand_bytes_2(self): + rand_bytes = envelope.evp.RAND_bytes + envelope.evp.RAND_bytes = run_n_times(evp.RAND_bytes, lambda a,b:0, 1) + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.RAND_bytes = rand_bytes + + def test_bad_rsa_size(self): + rsa_size = envelope.evp.RSA_size + envelope.evp.RSA_size = lambda a:0 + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.RSA_size = rsa_size + + def test_bad_read_privatekey(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + read_private_key = envelope.evp.PEM_read_PrivateKey + envelope.evp.PEM_read_PrivateKey = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.PEM_read_PrivateKey = read_private_key + + def test_bad_read_publickey(self): + read_public_key = envelope.evp.PEM_read_PUBKEY + envelope.evp.PEM_read_PUBKEY = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.PEM_read_PUBKEY = read_public_key + + def test_bad_read_bio_privatekey(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + read_private_key = envelope.evp.PEM_read_bio_PrivateKey + envelope.evp.PEM_read_bio_PrivateKey = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, key=open(KEY_1[0], 'rb').read()) + envelope.evp.PEM_read_bio_PrivateKey = read_private_key + + def test_bad_read_bio_publickey(self): + read_public_key = envelope.evp.PEM_read_bio_PUBKEY + envelope.evp.PEM_read_bio_PUBKEY = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, key=open(KEY_1[0], 'rb').read()) + envelope.evp.PEM_read_bio_PUBKEY = read_public_key + + def test_bad_fopen(self): + iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1]) + file_open = envelope.evp.fopen + envelope.evp.fopen = lambda a,b: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0]) + envelope.evp.fopen = file_open + + def test_bad_ctx_new(self): + iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1]) + new_ctx = envelope.evp.EVP_CIPHER_CTX_new + envelope.evp.EVP_CIPHER_CTX_new = lambda: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0]) + envelope.evp.EVP_CIPHER_CTX_new = new_ctx + + def test_bad_cipher_object(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + cipher_getter = envelope.evp.EVP_aes_192_cbc + envelope.evp.EVP_aes_192_cbc= lambda: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.EVP_aes_192_cbc = cipher_getter + + def test_bad_encrypt_init_1(self): + encrypt_init = envelope.evp.EVP_EncryptInit_ex + envelope.evp.EVP_EncryptInit_ex = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_EncryptInit_ex = encrypt_init + + def test_bad_encrypt_init_2(self): + encrypt_init = envelope.evp.EVP_EncryptInit_ex + envelope.evp.EVP_EncryptInit_ex = run_n_times(envelope.evp.EVP_EncryptInit_ex, lambda a,b,c,d,e: None, 1) + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_EncryptInit_ex = encrypt_init + + def test_bad_encrypt_update(self): + encrypt_update = envelope.evp.EVP_EncryptUpdate + envelope.evp.EVP_EncryptUpdate = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_EncryptUpdate = encrypt_update + + def test_bad_encrypt_final(self): + encrypt_final = envelope.evp.EVP_EncryptFinal_ex + envelope.evp.EVP_EncryptFinal_ex = lambda a,b,c: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_EncryptFinal_ex = encrypt_final + + def test_bad_open_init(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + decrypt_init = envelope.evp.EVP_OpenInit + envelope.evp.EVP_OpenInit = lambda a,b,c,d,e,f: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.EVP_OpenInit = decrypt_init + + def test_bad_decrypt_update(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + decrypt_update = envelope.evp.EVP_DecryptUpdate + envelope.evp.EVP_DecryptUpdate = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.EVP_DecryptUpdate = decrypt_update + + def test_bad_decrypt_final(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + decrypt_final = envelope.evp.EVP_DecryptFinal_ex + envelope.evp.EVP_DecryptFinal_ex = lambda a,b,c: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + cipher.evp.EVP_DecryptFinal_ex = decrypt_final + + def test_bad_rsa_get(self): + get_rsa = envelope.evp.EVP_PKEY_get1_RSA + envelope.evp.EVP_PKEY_get1_RSA = lambda a: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_PKEY_get1_RSA = get_rsa + + def test_bad_rsa_encrypt(self): + encrypt_rsa = envelope.evp.RSA_public_encrypt + envelope.evp.RSA_public_encrypt = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.RSA_public_encrypt = encrypt_rsa + + def test_bad_read_privatekey(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + read_private_key = envelope.evp.PEM_read_PrivateKey + envelope.evp.PEM_read_PrivateKey = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.PEM_read_PrivateKey = read_private_key + + def test_bad_read_publickey(self): + read_public_key = envelope.evp.PEM_read_PUBKEY + envelope.evp.PEM_read_PUBKEY = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.PEM_read_PUBKEY = read_public_key + + def test_bad_read_bio_privatekey(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + read_private_key = envelope.evp.PEM_read_bio_PrivateKey + envelope.evp.PEM_read_bio_PrivateKey = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, key=open(KEY_1[0], 'rb').read()) + envelope.evp.PEM_read_bio_PrivateKey = read_private_key + + def test_bad_read_bio_publickey(self): + read_public_key = envelope.evp.PEM_read_bio_PUBKEY + envelope.evp.PEM_read_bio_PUBKEY = lambda a,b,c,d: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, key=open(KEY_1[0], 'rb').read()) + envelope.evp.PEM_read_bio_PUBKEY = read_public_key + + def test_bad_fopen(self): + iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1]) + file_open = envelope.evp.fopen + envelope.evp.fopen = lambda a,b: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0]) + envelope.evp.fopen = file_open + + def test_bad_ctx_new(self): + iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1]) + new_ctx = envelope.evp.EVP_CIPHER_CTX_new + envelope.evp.EVP_CIPHER_CTX_new = lambda: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0]) + envelope.evp.EVP_CIPHER_CTX_new = new_ctx + + def test_bad_cipher_object(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + cipher_getter = envelope.evp.EVP_aes_192_cbc + envelope.evp.EVP_aes_192_cbc= lambda: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.EVP_aes_192_cbc = cipher_getter + + def test_bad_encrypt_init(self): + encrypt_init = envelope.evp.EVP_EncryptInit_ex + envelope.evp.EVP_EncryptInit_ex = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_EncryptInit_ex = encrypt_init + + def test_bad_encrypt_update(self): + encrypt_update = envelope.evp.EVP_EncryptUpdate + envelope.evp.EVP_EncryptUpdate = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_EncryptUpdate = encrypt_update + + def test_bad_encrypt_final(self): + encrypt_final = envelope.evp.EVP_EncryptFinal_ex + envelope.evp.EVP_EncryptFinal_ex = lambda a,b,c: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_EncryptFinal_ex = encrypt_final + + def test_bad_open_init(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + decrypt_init = envelope.evp.EVP_OpenInit + envelope.evp.EVP_OpenInit = lambda a,b,c,d,e,f: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.EVP_OpenInit = decrypt_init + + def test_bad_decrypt_update(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + decrypt_update = envelope.evp.EVP_DecryptUpdate + envelope.evp.EVP_DecryptUpdate = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + envelope.evp.EVP_DecryptUpdate = decrypt_update + + def test_bad_decrypt_final(self): + iv, key, enc = envelope.encrypt(SHORT, KEY_1[1]) + decrypt_final = envelope.evp.EVP_DecryptFinal_ex + envelope.evp.EVP_DecryptFinal_ex = lambda a,b,c: None + self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0]) + cipher.evp.EVP_DecryptFinal_ex = decrypt_final + + def test_bad_rsa_get(self): + get_rsa = envelope.evp.EVP_PKEY_get1_RSA + envelope.evp.EVP_PKEY_get1_RSA = lambda a: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.EVP_PKEY_get1_RSA = get_rsa + + def test_bad_rsa_encrypt(self): + encrypt_rsa = envelope.evp.RSA_public_encrypt + envelope.evp.RSA_public_encrypt = lambda a,b,c,d,e: None + self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1]) + envelope.evp.RSA_public_encrypt = encrypt_rsa + +if __name__ == "__main__": + unittest.main() diff --git a/src/quickstart.py b/src/quickstart.py new file mode 100755 index 00000000..765002b7 --- /dev/null +++ b/src/quickstart.py @@ -0,0 +1,399 @@ +""" + + quickstart.py + + + Vladimir Diaz + + + June 2012. Based on a previous version by Geremy Condra. + + + See LICENSE for licensing information. + + + This script acts as a handy quickstart for TUF, helping project and + repository maintainers get into the game as quickly and painlessly as + possible. 'quickstart.py' creates the metadata files for all the top-level + roles (along with their respective cryptographic keys), all of the + target files specified by the user, and a configuration file named + 'config.cfg'. The user may then use the 'signercli' script to modify, + if they wish, the basic repository created by 'quickstart.py'. + + If executed successfully, 'quickstart.py' saves the 'repository', 'keystore', + and 'client' directories to the current directory. The 'repository' directory + should be transferred to the server responding to TUF repository requests. + 'keystore' and the individual encrypted key files should be securely stored + and managed by the repository maintainer; these files will be needed again + when modifying the metadata files. 'client' should be initially distributed + to users by the software updater utilizing TUF. + + + $ python quickstart.py --