mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
- "quickstart.py:292: No global (EEXIST) found" - "basic_client.py:194: No global (option_parser) found" - "rsa_key.py:108: No global (EnvelopeError) found" - "rsa_key.py:108: No global (KeygenError) found" - "util.py:56: No global (temp_dir) found"
329 lines
9.9 KiB
Python
Executable file
329 lines
9.9 KiB
Python
Executable file
#! /usr/bin/env python
|
|
|
|
"""
|
|
envelope.py
|
|
|
|
Written by Geremy Condra
|
|
Released on 18 March 2010
|
|
Licensed under MIT License
|
|
|
|
This module provides a basic interface to OpenSSL's EVP
|
|
envelope functions.
|
|
|
|
In a nutshell, these functions are designed to provide
|
|
the primary benefit of public key cryptography (the
|
|
ability to provide secrecy without first sharing a
|
|
secret) without its primary downside (small message
|
|
length). It does this by generating a random AES key
|
|
with which to encrypt the data, then encrypting that
|
|
key against the provided RSA key.
|
|
|
|
This means that if you have an application in which
|
|
you wish to share sensitive data but do not wish to
|
|
share a common secret, this is your module. Be aware
|
|
that compromising your private key is effectively
|
|
game over with this scheme.
|
|
|
|
If you require a shared secret and want the key to
|
|
be human readable, then you will probably want to
|
|
use the cipher module instead.
|
|
|
|
All the functions in this module raise EnvelopeError on
|
|
malfunction.
|
|
|
|
Usage:
|
|
|
|
>>> from evpy import envelope
|
|
>>> f = open("test/short.txt", "rb")
|
|
>>> data = f.read()
|
|
>>> public_key = "test/keys/public1.pem"
|
|
>>> private_key = "test/keys/private1.pem"
|
|
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
|
|
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
|
|
True
|
|
"""
|
|
|
|
import ctypes
|
|
|
|
import evp
|
|
from signature import _string_to_bio
|
|
|
|
class EnvelopeError(evp.SSLError):
|
|
pass
|
|
|
|
class KeygenError(evp.SSLError):
|
|
pass
|
|
|
|
def _build_dkey_from_file(keyfile):
|
|
fp = evp.fopen(keyfile, "r")
|
|
if not fp:
|
|
raise EnvelopeError("Could not open keyfile")
|
|
# get the decryption key
|
|
skey = evp.PEM_read_PrivateKey(fp, None, None, None)
|
|
if not skey:
|
|
evp.fclose(fp)
|
|
raise EnvelopeError("Could not read decryption key")
|
|
# close the file
|
|
evp.fclose(fp)
|
|
return skey
|
|
|
|
def _build_dkey_from_string(key):
|
|
bio = _string_to_bio(key)
|
|
dkey = evp.PEM_read_bio_PrivateKey(bio, None, None, None)
|
|
if not dkey:
|
|
raise EnvelopeError("Could not build decryption key from string")
|
|
evp.BIO_free(bio)
|
|
return dkey
|
|
|
|
def _build_ekey_from_file(keyfile):
|
|
fp = evp.fopen(keyfile, "r")
|
|
if not fp:
|
|
raise EnvelopeError("Could not open keyfile")
|
|
# get the encryption key
|
|
ekey = evp.PEM_read_PUBKEY(fp, None, None, None)
|
|
if not ekey:
|
|
evp.fclose(fp)
|
|
raise EnvelopeError("Could not read encryption key")
|
|
# close the file
|
|
evp.fclose(fp)
|
|
return ekey
|
|
|
|
def _build_ekey_from_string(key):
|
|
bio = _string_to_bio(key)
|
|
ekey = evp.PEM_read_bio_PUBKEY(bio, None, None, None)
|
|
if not ekey:
|
|
raise EnvelopeError("Could not create encryption key from string")
|
|
evp.BIO_free(bio)
|
|
return ekey
|
|
|
|
def _build_bio():
|
|
method = evp.BIO_s_mem()
|
|
return evp.BIO_new(method);
|
|
|
|
def _asn1_hex_to_int(value):
|
|
print(value)
|
|
return int(''.join(value.split(':')), 16)
|
|
|
|
def _parse_printed_key(k):
|
|
attrs = {}
|
|
current = ""
|
|
current_attr = ""
|
|
for line in k.splitlines()[1:]:
|
|
# its a continuation of the current block
|
|
if line.startswith(' '):
|
|
current += line.strip()
|
|
else:
|
|
# special case the public exponent
|
|
if "publicExponent" in current_attr:
|
|
attrs['publicExponent'] = int(current_attr.split()[1])
|
|
elif current_attr:
|
|
attrs[current_attr] = _asn1_hex_to_int(current)
|
|
current_attr = line.strip(':')
|
|
current = ""
|
|
translator = {'publicExponent': 'e', 'privateExponent': 'd', 'modulus': 'n', 'prime1': 'p', 'prime2': 'q'}
|
|
translated_attrs = {}
|
|
for key, value in attrs.items():
|
|
try:
|
|
translated_attrs[translator[key]] = value
|
|
except: pass
|
|
return translated_attrs
|
|
|
|
|
|
def keygen(bitlength=1024, e=65537, pem=True):
|
|
key = evp.RSA_generate_key(bitlength, e, None, None)
|
|
if not key:
|
|
raise EnvelopeError("Could not generate key")
|
|
if pem:
|
|
private_bio = evp.BIO_new(evp.BIO_s_mem())
|
|
if not private_bio:
|
|
raise KeygenError("Could not create temporary storage")
|
|
public_bio = evp.BIO_new(evp.BIO_s_mem())
|
|
if not public_bio:
|
|
raise KeygenError("Could not create temporary storage")
|
|
private_buf = ctypes.create_string_buffer('', 65537)
|
|
if not private_buf:
|
|
raise MemoryError("Could not allocate key storage")
|
|
public_buf = ctypes.create_string_buffer('', 65537)
|
|
if not public_buf:
|
|
raise MemoryError("Could not allocate key storage")
|
|
if not evp.PEM_write_bio_RSAPrivateKey(private_bio, key, None, None, 0, 0, None):
|
|
raise KeygenError("Could not write private key")
|
|
if not evp.PEM_write_bio_RSA_PUBKEY(public_bio, key):
|
|
raise KeygenError("Could not write public key")
|
|
public_len = evp.BIO_read(public_bio, public_buf, 65537)
|
|
private_len = evp.BIO_read(private_bio, private_buf, 65537)
|
|
evp.BIO_free(public_bio)
|
|
evp.BIO_free(private_bio)
|
|
return public_buf.value, private_buf.value
|
|
else:
|
|
# we go through this rigamarole because if there's an engine
|
|
# in place it won't populate the RSA key's values properly.
|
|
key_bio = evp.BIO_new(evp.BIO_s_mem())
|
|
if not key_bio:
|
|
raise KeygenError("Could not create temporary storage")
|
|
if not evp.RSA_print(key_bio, key, 0):
|
|
raise KeygenError("Could not stringify key")
|
|
key_buf = ctypes.create_string_buffer('', 65537)
|
|
if not key_buf:
|
|
raise MemoryError("Could not allocate key storage")
|
|
evp.BIO_read(key_bio, key_buf, 65537)
|
|
evp.BIO_free(key_bio)
|
|
key_string = key_buf.value
|
|
return key, _parse_printed_key(key_string)
|
|
|
|
|
|
|
|
|
|
def encrypt(data, keyfile=None, key=None):
|
|
"""Encrypts the given data, raising EnvelopeError on failure.
|
|
|
|
This uses AES192 to do bulk encryption and RSA to encrypt
|
|
the given public key.
|
|
|
|
Usage:
|
|
>>> from evpy import envelope
|
|
>>> f = open("test/short.txt", "rb")
|
|
>>> data = f.read()
|
|
>>> public_key = "test/keys/public1.pem"
|
|
>>> private_key = "test/keys/private1.pem"
|
|
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
|
|
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
|
|
True
|
|
"""
|
|
# validate the incoming data
|
|
if not data:
|
|
raise EnvelopeError("Incoming data must be bytes")
|
|
if not len(data):
|
|
raise EnvelopeError("Data must actually exist")
|
|
|
|
# build and initialize the context
|
|
ctx = evp.EVP_CIPHER_CTX_new()
|
|
if not ctx:
|
|
raise EnvelopeError("Could not create context")
|
|
evp.EVP_CIPHER_CTX_init(ctx)
|
|
|
|
# get the key from the keyfile
|
|
if key and not keyfile:
|
|
ekey = _build_ekey_from_string(key)
|
|
elif keyfile and not key:
|
|
ekey = _build_ekey_from_file(keyfile)
|
|
else:
|
|
raise EnvelopeError("Must specify exactly one key or keyfile")
|
|
|
|
# get the cipher object
|
|
cipher_object = evp.EVP_aes_192_cbc()
|
|
if not cipher_object:
|
|
raise EnvelopeError("Could not create cipher object")
|
|
|
|
# finish the context and cipher object
|
|
if not evp.EVP_EncryptInit_ex(ctx, cipher_object, None, None, None):
|
|
raise EnvelopeError("Could not finish context")
|
|
|
|
# build the randomized iv
|
|
iv_length = evp.EVP_CIPHER_CTX_iv_length(ctx)
|
|
iv = ctypes.create_string_buffer(iv_length)
|
|
for i in range(1000):
|
|
if evp.RAND_bytes(iv, iv_length): break
|
|
else:
|
|
raise EnvelopeError("Could not generate enough entropy for IV")
|
|
output_iv = iv.raw
|
|
|
|
# build the randomized AES key
|
|
keysize = evp.EVP_CIPHER_key_length(cipher_object)
|
|
aes_key = ctypes.create_string_buffer(keysize)
|
|
for i in range(1000):
|
|
if evp.RAND_bytes(aes_key, keysize): break
|
|
else:
|
|
raise EnvelopeError("Could not generate enough entropy for AES key")
|
|
|
|
# extract the RSA key
|
|
rsa_key = evp.EVP_PKEY_get1_RSA(ekey)
|
|
if not rsa_key:
|
|
raise EnvelopeError("Could not get RSA key")
|
|
|
|
# encrypt it
|
|
buf_size = evp.RSA_size(rsa_key)
|
|
if not buf_size:
|
|
raise EnvelopeError("Invalid RSA keysize")
|
|
encrypted_aes_key = ctypes.create_string_buffer(buf_size)
|
|
# RSA_PKCS1_PADDING is defined as 1
|
|
written = evp.RSA_public_encrypt(keysize, aes_key, encrypted_aes_key, rsa_key, 1)
|
|
if not written:
|
|
raise EnvelopeError("Could not encrypt AES key")
|
|
output_key = encrypted_aes_key.raw[:written]
|
|
|
|
# initialize the encryption operation
|
|
if not evp.EVP_EncryptInit_ex(ctx, None, None, aes_key, iv):
|
|
raise EnvelopeError("Could not start encryption operation")
|
|
|
|
# build the output buffer
|
|
buf = ctypes.create_string_buffer(len(data) + 16)
|
|
written = ctypes.c_int(0)
|
|
final = ctypes.c_int(0)
|
|
|
|
# update
|
|
if not evp.EVP_EncryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
|
|
raise EnvelopeError("Could not update ciphertext")
|
|
output = buf.raw[:written.value]
|
|
|
|
# finalize
|
|
if not evp.EVP_EncryptFinal_ex(ctx, buf, ctypes.byref(final)):
|
|
raise EnvelopeError("Could not finalize ciphertext")
|
|
output += buf.raw[:final.value]
|
|
|
|
# ...and go home
|
|
return output_iv, output_key, output
|
|
|
|
|
|
def decrypt(iv, encrypted_aes_key, data, keyfile=None, key=None):
|
|
"""Decrypts the given ciphertext, raising EnvelopeError on failure.
|
|
|
|
Usage:
|
|
>>> from evpy import envelope
|
|
>>> f = open("test/short.txt", "rb")
|
|
>>> data = f.read()
|
|
>>> public_key = "test/keys/public1.pem"
|
|
>>> private_key = "test/keys/private1.pem"
|
|
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
|
|
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
|
|
True
|
|
"""
|
|
# build and initialize the context
|
|
ctx = evp.EVP_CIPHER_CTX_new()
|
|
if not ctx:
|
|
raise EnvelopeError("Could not create context")
|
|
evp.EVP_CIPHER_CTX_init(ctx)
|
|
|
|
# get the cipher object
|
|
cipher_object = evp.EVP_aes_192_cbc()
|
|
if not cipher_object:
|
|
raise EnvelopeError("Could not create cipher object")
|
|
|
|
# get the key from the keyfile
|
|
if key and not keyfile:
|
|
dkey = _build_dkey_from_string(key)
|
|
elif keyfile and not key:
|
|
dkey = _build_dkey_from_file(keyfile)
|
|
else:
|
|
raise EnvelopeError("Must specify exactly one key or keyfile")
|
|
|
|
# open the envelope
|
|
if not evp.EVP_OpenInit(ctx, cipher_object, encrypted_aes_key, len(encrypted_aes_key), iv, dkey):
|
|
raise EnvelopeError("Could not open envelope")
|
|
|
|
# build the output buffer
|
|
buf = ctypes.create_string_buffer(len(data) + 16)
|
|
written = ctypes.c_int(0)
|
|
final = ctypes.c_int(0)
|
|
|
|
# update
|
|
if not evp.EVP_DecryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
|
|
raise EnvelopeError("Could not update envelope")
|
|
output = buf.raw[:written.value]
|
|
|
|
# finalize
|
|
if not evp.EVP_DecryptFinal_ex(ctx, buf, ctypes.byref(final)):
|
|
raise EnvelopeError("Could not finalize envelope")
|
|
output += buf.raw[:final.value]
|
|
|
|
return output
|