python-tuf/evpy/envelope.py

326 lines
9.8 KiB
Python
Executable file

#! /usr/bin/env python
"""
envelope.py
Written by Geremy Condra
Released on 18 March 2010
Licensed under MIT License
This module provides a basic interface to OpenSSL's EVP
envelope functions.
In a nutshell, these functions are designed to provide
the primary benefit of public key cryptography (the
ability to provide secrecy without first sharing a
secret) without its primary downside (small message
length). It does this by generating a random AES key
with which to encrypt the data, then encrypting that
key against the provided RSA key.
This means that if you have an application in which
you wish to share sensitive data but do not wish to
share a common secret, this is your module. Be aware
that compromising your private key is effectively
game over with this scheme.
If you require a shared secret and want the key to
be human readable, then you will probably want to
use the cipher module instead.
All the functions in this module raise EnvelopeError on
malfunction.
Usage:
>>> from evpy import envelope
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
True
"""
import ctypes
import evp
from signature import _string_to_bio
class EnvelopeError(evp.SSLError):
pass
def _build_dkey_from_file(keyfile):
fp = evp.fopen(keyfile, "r")
if not fp:
raise EnvelopeError("Could not open keyfile")
# get the decryption key
skey = evp.PEM_read_PrivateKey(fp, None, None, None)
if not skey:
evp.fclose(fp)
raise EnvelopeError("Could not read decryption key")
# close the file
evp.fclose(fp)
return skey
def _build_dkey_from_string(key):
bio = _string_to_bio(key)
dkey = evp.PEM_read_bio_PrivateKey(bio, None, None, None)
if not dkey:
raise EnvelopeError("Could not build decryption key from string")
evp.BIO_free(bio)
return dkey
def _build_ekey_from_file(keyfile):
fp = evp.fopen(keyfile, "r")
if not fp:
raise EnvelopeError("Could not open keyfile")
# get the encryption key
ekey = evp.PEM_read_PUBKEY(fp, None, None, None)
if not ekey:
evp.fclose(fp)
raise EnvelopeError("Could not read encryption key")
# close the file
evp.fclose(fp)
return ekey
def _build_ekey_from_string(key):
bio = _string_to_bio(key)
ekey = evp.PEM_read_bio_PUBKEY(bio, None, None, None)
if not ekey:
raise EnvelopeError("Could not create encryption key from string")
evp.BIO_free(bio)
return ekey
def _build_bio():
method = evp.BIO_s_mem()
return evp.BIO_new(method);
def _asn1_hex_to_int(value):
print(value)
return int(''.join(value.split(':')), 16)
def _parse_printed_key(k):
attrs = {}
current = ""
current_attr = ""
for line in k.splitlines()[1:]:
# its a continuation of the current block
if line.startswith(' '):
current += line.strip()
else:
# special case the public exponent
if "publicExponent" in current_attr:
attrs['publicExponent'] = int(current_attr.split()[1])
elif current_attr:
attrs[current_attr] = _asn1_hex_to_int(current)
current_attr = line.strip(':')
current = ""
translator = {'publicExponent': 'e', 'privateExponent': 'd', 'modulus': 'n', 'prime1': 'p', 'prime2': 'q'}
translated_attrs = {}
for key, value in attrs.items():
try:
translated_attrs[translator[key]] = value
except: pass
return translated_attrs
def keygen(bitlength=1024, e=65537, pem=True):
key = evp.RSA_generate_key(bitlength, e, None, None)
if not key:
raise EnvelopeError("Could not generate key")
if pem:
private_bio = evp.BIO_new(evp.BIO_s_mem())
if not private_bio:
raise KeygenError("Could not create temporary storage")
public_bio = evp.BIO_new(evp.BIO_s_mem())
if not public_bio:
raise KeygenError("Could not create temporary storage")
private_buf = ctypes.create_string_buffer('', 65537)
if not private_buf:
raise MemoryError("Could not allocate key storage")
public_buf = ctypes.create_string_buffer('', 65537)
if not public_buf:
raise MemoryError("Could not allocate key storage")
if not evp.PEM_write_bio_RSAPrivateKey(private_bio, key, None, None, 0, 0, None):
raise KeygenError("Could not write private key")
if not evp.PEM_write_bio_RSA_PUBKEY(public_bio, key):
raise KeygenError("Could not write public key")
public_len = evp.BIO_read(public_bio, public_buf, 65537)
private_len = evp.BIO_read(private_bio, private_buf, 65537)
evp.BIO_free(public_bio)
evp.BIO_free(private_bio)
return public_buf.value, private_buf.value
else:
# we go through this rigamarole because if there's an engine
# in place it won't populate the RSA key's values properly.
key_bio = evp.BIO_new(evp.BIO_s_mem())
if not key_bio:
raise KeygenError("Could not create temporary storage")
if not evp.RSA_print(key_bio, key, 0):
raise KeygenError("Could not stringify key")
key_buf = ctypes.create_string_buffer('', 65537)
if not key_buf:
raise MemoryError("Could not allocate key storage")
evp.BIO_read(key_bio, key_buf, 65537)
evp.BIO_free(key_bio)
key_string = key_buf.value
return key, _parse_printed_key(key_string)
def encrypt(data, keyfile=None, key=None):
"""Encrypts the given data, raising EnvelopeError on failure.
This uses AES192 to do bulk encryption and RSA to encrypt
the given public key.
Usage:
>>> from evpy import envelope
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
True
"""
# validate the incoming data
if not data:
raise EnvelopeError("Incoming data must be bytes")
if not len(data):
raise EnvelopeError("Data must actually exist")
# build and initialize the context
ctx = evp.EVP_CIPHER_CTX_new()
if not ctx:
raise EnvelopeError("Could not create context")
evp.EVP_CIPHER_CTX_init(ctx)
# get the key from the keyfile
if key and not keyfile:
ekey = _build_ekey_from_string(key)
elif keyfile and not key:
ekey = _build_ekey_from_file(keyfile)
else:
raise EnvelopeError("Must specify exactly one key or keyfile")
# get the cipher object
cipher_object = evp.EVP_aes_192_cbc()
if not cipher_object:
raise EnvelopeError("Could not create cipher object")
# finish the context and cipher object
if not evp.EVP_EncryptInit_ex(ctx, cipher_object, None, None, None):
raise EnvelopeError("Could not finish context")
# build the randomized iv
iv_length = evp.EVP_CIPHER_CTX_iv_length(ctx)
iv = ctypes.create_string_buffer(iv_length)
for i in range(1000):
if evp.RAND_bytes(iv, iv_length): break
else:
raise EnvelopeError("Could not generate enough entropy for IV")
output_iv = iv.raw
# build the randomized AES key
keysize = evp.EVP_CIPHER_key_length(cipher_object)
aes_key = ctypes.create_string_buffer(keysize)
for i in range(1000):
if evp.RAND_bytes(aes_key, keysize): break
else:
raise EnvelopeError("Could not generate enough entropy for AES key")
# extract the RSA key
rsa_key = evp.EVP_PKEY_get1_RSA(ekey)
if not rsa_key:
raise EnvelopeError("Could not get RSA key")
# encrypt it
buf_size = evp.RSA_size(rsa_key)
if not buf_size:
raise EnvelopeError("Invalid RSA keysize")
encrypted_aes_key = ctypes.create_string_buffer(buf_size)
# RSA_PKCS1_PADDING is defined as 1
written = evp.RSA_public_encrypt(keysize, aes_key, encrypted_aes_key, rsa_key, 1)
if not written:
raise EnvelopeError("Could not encrypt AES key")
output_key = encrypted_aes_key.raw[:written]
# initialize the encryption operation
if not evp.EVP_EncryptInit_ex(ctx, None, None, aes_key, iv):
raise EnvelopeError("Could not start encryption operation")
# build the output buffer
buf = ctypes.create_string_buffer(len(data) + 16)
written = ctypes.c_int(0)
final = ctypes.c_int(0)
# update
if not evp.EVP_EncryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
raise EnvelopeError("Could not update ciphertext")
output = buf.raw[:written.value]
# finalize
if not evp.EVP_EncryptFinal_ex(ctx, buf, ctypes.byref(final)):
raise EnvelopeError("Could not finalize ciphertext")
output += buf.raw[:final.value]
# ...and go home
return output_iv, output_key, output
def decrypt(iv, encrypted_aes_key, data, keyfile=None, key=None):
"""Decrypts the given ciphertext, raising EnvelopeError on failure.
Usage:
>>> from evpy import envelope
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
True
"""
# build and initialize the context
ctx = evp.EVP_CIPHER_CTX_new()
if not ctx:
raise EnvelopeError("Could not create context")
evp.EVP_CIPHER_CTX_init(ctx)
# get the cipher object
cipher_object = evp.EVP_aes_192_cbc()
if not cipher_object:
raise EnvelopeError("Could not create cipher object")
# get the key from the keyfile
if key and not keyfile:
dkey = _build_dkey_from_string(key)
elif keyfile and not key:
dkey = _build_dkey_from_file(keyfile)
else:
raise EnvelopeError("Must specify exactly one key or keyfile")
# open the envelope
if not evp.EVP_OpenInit(ctx, cipher_object, encrypted_aes_key, len(encrypted_aes_key), iv, dkey):
raise EnvelopeError("Could not open envelope")
# build the output buffer
buf = ctypes.create_string_buffer(len(data) + 16)
written = ctypes.c_int(0)
final = ctypes.c_int(0)
# update
if not evp.EVP_DecryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
raise EnvelopeError("Could not update envelope")
output = buf.raw[:written.value]
# finalize
if not evp.EVP_DecryptFinal_ex(ctx, buf, ctypes.byref(final)):
raise EnvelopeError("Could not finalize envelope")
output += buf.raw[:final.value]
return output