This commit is contained in:
vladimir-v-diaz 2013-09-10 10:20:34 -04:00
commit 87df16efe4
49 changed files with 404 additions and 1999 deletions

1
.gitignore vendored
View file

@ -8,3 +8,4 @@ build/*
*.session
*.swo
*.swp
tuf.egg-info

View file

@ -1,4 +1,4 @@
# A Framework for Securing Software Update Systems
## A Framework for Securing Software Update Systems
TUF (The Update Framework) helps developers secure their new or existing
software update systems. Software update systems are vulnerable to many known
@ -6,7 +6,7 @@ attacks, including those that can result in clients being compromised or
crashed. TUF helps solve this problem by providing a flexible security
framework that can be added to software updaters.
# What Is a Software Update System?
## What Is a Software Update System?
Generally, a software update system is an application (or part of an
application) running on a client system that obtains and installs software.
@ -27,7 +27,7 @@ Python's pip/easy_install + PyPI, Perl's CPAN, Ruby's Gems, and PHP's PEAR.
of the software on a client system. Debian's APT, Red Hat's YUM, and openSUSE's
YaST are examples of these.
# Our Approach
## Our Approach
There are literally thousands of different software update systems in common
use today. (In fact the average Windows user has about two dozen different

View file

View file

@ -1,200 +0,0 @@
#! /usr/bin/env python
"""
cipher.py
Written by Geremy Condra
Released on 18 March 2010
Licensed under MIT License
This module provides a basic interface to OpenSSL's EVP
cipher functions.
All the functions in this module raise CipherError on
malfunction.
From an end-user perspective, this module should be used
in situations where you want to have a single generally
human-readable or human-generated key used for both
encryption and decryption.
This means that as a general rule, if your application
involves transmitting this key over an insecure channel
you should not be using this module, but rather
evpy.envelope.
Usage:
>>> from evpy import cipher
>>> message = b"this is data"
>>> pw = b"mypassword"
>>> salt, iv, enc = cipher.encrypt(message, pw)
>>> cipher.decrypt(salt, iv, enc, pw)
'this is data'
"""
import ctypes
import time
import evp
class CipherError(evp.SSLError):
pass
def _strengthen_password(pw, iv, salt=None):
# add the hash
evp.OpenSSL_add_all_digests()
# build the key buffer
key = ctypes.create_string_buffer(24)
# either take the existing salt or build a new one
if not salt:
salt = ctypes.create_string_buffer(8)
# get the needed entropy, bailing if it doesn't work in
# the first thousand tries
for i in range(1000):
if evp.RAND_bytes(salt, 8): break
else:
raise CipherError("Could not generate enough entropy")
# extract the salt
salt = salt.raw
# get the hash
evp_hash = evp.EVP_get_digestbyname("sha512")
if not evp_hash:
raise CipherError("Could not create hash object")
# fill the key
if not evp.EVP_BytesToKey(evp.EVP_aes_192_cbc(), evp_hash, salt, pw, len(pw), 1000, key, iv):
raise CipherError("Could not strengthen key")
# go home
return salt, key.raw
def encrypt(data, password):
"""Encrypts the given data, raising CipherError on failure.
This uses AES192 to encrypt and strengthens the given
passphrase using SHA512.
Usage:
>>> from evpy import cipher
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> pw = b"mypassword"
>>> salt, iv, enc = cipher.encrypt(data, pw)
>>> cipher.decrypt(salt, iv, enc, pw) == data
True
"""
# ensure data exists
if not len(data):
raise CipherError("Data must actually exist")
if not len(password):
raise CipherError("Password must actually exist")
# build and initialize the context
ctx = evp.EVP_CIPHER_CTX_new()
if not ctx:
raise CipherError("Could not create context")
evp.EVP_CIPHER_CTX_init(ctx)
# get the cipher object
cipher_object = evp.EVP_aes_192_cbc()
if not cipher_object:
raise CipherError("Could not create cipher object")
# finish the context and cipher object
if not evp.EVP_EncryptInit_ex(ctx, cipher_object, None, None, None):
raise CipherError("Could not finish context")
# build the randomized iv
iv_length = evp.EVP_CIPHER_CTX_iv_length(ctx)
iv = ctypes.create_string_buffer(iv_length)
# get the needed entropy, bailing if it doesn't work in
# the first thousand tries
for i in range(1000):
if evp.RAND_bytes(iv, iv_length): break
else:
raise CipherError("Not enough entropy for IV")
output_iv = iv.raw
# strengthen the password into an honest-to-goodness key
salt, aes_key = _strengthen_password(password, iv)
# initialize the encryption operation
if not evp.EVP_EncryptInit_ex(ctx, None, None, aes_key, iv):
raise CipherError("Could not start encryption operation")
# build the output buffer
buf = ctypes.create_string_buffer(len(data) + 16)
written = ctypes.c_int(0)
final = ctypes.c_int(0)
# update
if not evp.EVP_EncryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
raise CipherError("Could not update ciphertext")
output = buf.raw[:written.value]
# finalize
if not evp.EVP_EncryptFinal_ex(ctx, buf, ctypes.byref(final)):
raise CipherError("Could not finalize ciphertext")
output += buf.raw[:final.value]
# ...and go home
return salt, output_iv, output
def decrypt(salt, iv, data, password):
"""Decrypts the given data, raising CipherError on failure.
Usage:
>>> from evpy import cipher
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> pw = b"mypassword"
>>> salt, iv, enc = cipher.encrypt(data, pw)
>>> cipher.decrypt(salt, iv, enc, pw) == data
True
"""
# ensure inputs are the correct size
if not len(data):
raise CipherError("Data must actually exist")
if not len(password):
raise CipherError("Password must actually exist")
if len(salt) != 8:
raise CipherError("Incorrect salt size")
if len(iv) != 16:
raise CipherError("Incorrect iv size")
# build and initialize the context
ctx = evp.EVP_CIPHER_CTX_new()
if not ctx:
raise CipherError("Could not create context")
evp.EVP_CIPHER_CTX_init(ctx)
# get the cipher object
cipher_object = evp.EVP_aes_192_cbc()
if not cipher_object:
raise CipherError("Could not create cipher object")
# build the key
salt, key = _strengthen_password(password, iv, salt)
# start decrypting the ciphertext
if not evp.EVP_DecryptInit_ex(ctx, cipher_object, None, key, iv):
raise CipherError("Could not open envelope")
# build the output buffers
buf = ctypes.create_string_buffer(len(data) + 16)
written = ctypes.c_int(0)
final = ctypes.c_int(0)
# update
if not evp.EVP_DecryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
raise CipherError("Could not update plaintext")
output = buf.raw[:written.value]
# finalize
if not evp.EVP_DecryptFinal_ex(ctx, buf, ctypes.byref(final)):
raise CipherError("Could not finalize decryption")
output += buf.raw[:final.value]
return output

View file

@ -1,329 +0,0 @@
#! /usr/bin/env python
"""
envelope.py
Written by Geremy Condra
Released on 18 March 2010
Licensed under MIT License
This module provides a basic interface to OpenSSL's EVP
envelope functions.
In a nutshell, these functions are designed to provide
the primary benefit of public key cryptography (the
ability to provide secrecy without first sharing a
secret) without its primary downside (small message
length). It does this by generating a random AES key
with which to encrypt the data, then encrypting that
key against the provided RSA key.
This means that if you have an application in which
you wish to share sensitive data but do not wish to
share a common secret, this is your module. Be aware
that compromising your private key is effectively
game over with this scheme.
If you require a shared secret and want the key to
be human readable, then you will probably want to
use the cipher module instead.
All the functions in this module raise EnvelopeError on
malfunction.
Usage:
>>> from evpy import envelope
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
True
"""
import ctypes
import evp
from signature import _string_to_bio
class EnvelopeError(evp.SSLError):
pass
class KeygenError(evp.SSLError):
pass
def _build_dkey_from_file(keyfile):
fp = evp.fopen(keyfile, "r")
if not fp:
raise EnvelopeError("Could not open keyfile")
# get the decryption key
skey = evp.PEM_read_PrivateKey(fp, None, None, None)
if not skey:
evp.fclose(fp)
raise EnvelopeError("Could not read decryption key")
# close the file
evp.fclose(fp)
return skey
def _build_dkey_from_string(key):
bio = _string_to_bio(key)
dkey = evp.PEM_read_bio_PrivateKey(bio, None, None, None)
if not dkey:
raise EnvelopeError("Could not build decryption key from string")
evp.BIO_free(bio)
return dkey
def _build_ekey_from_file(keyfile):
fp = evp.fopen(keyfile, "r")
if not fp:
raise EnvelopeError("Could not open keyfile")
# get the encryption key
ekey = evp.PEM_read_PUBKEY(fp, None, None, None)
if not ekey:
evp.fclose(fp)
raise EnvelopeError("Could not read encryption key")
# close the file
evp.fclose(fp)
return ekey
def _build_ekey_from_string(key):
bio = _string_to_bio(key)
ekey = evp.PEM_read_bio_PUBKEY(bio, None, None, None)
if not ekey:
raise EnvelopeError("Could not create encryption key from string")
evp.BIO_free(bio)
return ekey
def _build_bio():
method = evp.BIO_s_mem()
return evp.BIO_new(method);
def _asn1_hex_to_int(value):
print(value)
return int(''.join(value.split(':')), 16)
def _parse_printed_key(k):
attrs = {}
current = ""
current_attr = ""
for line in k.splitlines()[1:]:
# its a continuation of the current block
if line.startswith(' '):
current += line.strip()
else:
# special case the public exponent
if "publicExponent" in current_attr:
attrs['publicExponent'] = int(current_attr.split()[1])
elif current_attr:
attrs[current_attr] = _asn1_hex_to_int(current)
current_attr = line.strip(':')
current = ""
translator = {'publicExponent': 'e', 'privateExponent': 'd', 'modulus': 'n', 'prime1': 'p', 'prime2': 'q'}
translated_attrs = {}
for key, value in attrs.items():
try:
translated_attrs[translator[key]] = value
except: pass
return translated_attrs
def keygen(bitlength=1024, e=65537, pem=True):
key = evp.RSA_generate_key(bitlength, e, None, None)
if not key:
raise EnvelopeError("Could not generate key")
if pem:
private_bio = evp.BIO_new(evp.BIO_s_mem())
if not private_bio:
raise KeygenError("Could not create temporary storage")
public_bio = evp.BIO_new(evp.BIO_s_mem())
if not public_bio:
raise KeygenError("Could not create temporary storage")
private_buf = ctypes.create_string_buffer('', 65537)
if not private_buf:
raise MemoryError("Could not allocate key storage")
public_buf = ctypes.create_string_buffer('', 65537)
if not public_buf:
raise MemoryError("Could not allocate key storage")
if not evp.PEM_write_bio_RSAPrivateKey(private_bio, key, None, None, 0, 0, None):
raise KeygenError("Could not write private key")
if not evp.PEM_write_bio_RSA_PUBKEY(public_bio, key):
raise KeygenError("Could not write public key")
public_len = evp.BIO_read(public_bio, public_buf, 65537)
private_len = evp.BIO_read(private_bio, private_buf, 65537)
evp.BIO_free(public_bio)
evp.BIO_free(private_bio)
return public_buf.value, private_buf.value
else:
# we go through this rigamarole because if there's an engine
# in place it won't populate the RSA key's values properly.
key_bio = evp.BIO_new(evp.BIO_s_mem())
if not key_bio:
raise KeygenError("Could not create temporary storage")
if not evp.RSA_print(key_bio, key, 0):
raise KeygenError("Could not stringify key")
key_buf = ctypes.create_string_buffer('', 65537)
if not key_buf:
raise MemoryError("Could not allocate key storage")
evp.BIO_read(key_bio, key_buf, 65537)
evp.BIO_free(key_bio)
key_string = key_buf.value
return key, _parse_printed_key(key_string)
def encrypt(data, keyfile=None, key=None):
"""Encrypts the given data, raising EnvelopeError on failure.
This uses AES192 to do bulk encryption and RSA to encrypt
the given public key.
Usage:
>>> from evpy import envelope
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
True
"""
# validate the incoming data
if not data:
raise EnvelopeError("Incoming data must be bytes")
if not len(data):
raise EnvelopeError("Data must actually exist")
# build and initialize the context
ctx = evp.EVP_CIPHER_CTX_new()
if not ctx:
raise EnvelopeError("Could not create context")
evp.EVP_CIPHER_CTX_init(ctx)
# get the key from the keyfile
if key and not keyfile:
ekey = _build_ekey_from_string(key)
elif keyfile and not key:
ekey = _build_ekey_from_file(keyfile)
else:
raise EnvelopeError("Must specify exactly one key or keyfile")
# get the cipher object
cipher_object = evp.EVP_aes_192_cbc()
if not cipher_object:
raise EnvelopeError("Could not create cipher object")
# finish the context and cipher object
if not evp.EVP_EncryptInit_ex(ctx, cipher_object, None, None, None):
raise EnvelopeError("Could not finish context")
# build the randomized iv
iv_length = evp.EVP_CIPHER_CTX_iv_length(ctx)
iv = ctypes.create_string_buffer(iv_length)
for i in range(1000):
if evp.RAND_bytes(iv, iv_length): break
else:
raise EnvelopeError("Could not generate enough entropy for IV")
output_iv = iv.raw
# build the randomized AES key
keysize = evp.EVP_CIPHER_key_length(cipher_object)
aes_key = ctypes.create_string_buffer(keysize)
for i in range(1000):
if evp.RAND_bytes(aes_key, keysize): break
else:
raise EnvelopeError("Could not generate enough entropy for AES key")
# extract the RSA key
rsa_key = evp.EVP_PKEY_get1_RSA(ekey)
if not rsa_key:
raise EnvelopeError("Could not get RSA key")
# encrypt it
buf_size = evp.RSA_size(rsa_key)
if not buf_size:
raise EnvelopeError("Invalid RSA keysize")
encrypted_aes_key = ctypes.create_string_buffer(buf_size)
# RSA_PKCS1_PADDING is defined as 1
written = evp.RSA_public_encrypt(keysize, aes_key, encrypted_aes_key, rsa_key, 1)
if not written:
raise EnvelopeError("Could not encrypt AES key")
output_key = encrypted_aes_key.raw[:written]
# initialize the encryption operation
if not evp.EVP_EncryptInit_ex(ctx, None, None, aes_key, iv):
raise EnvelopeError("Could not start encryption operation")
# build the output buffer
buf = ctypes.create_string_buffer(len(data) + 16)
written = ctypes.c_int(0)
final = ctypes.c_int(0)
# update
if not evp.EVP_EncryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
raise EnvelopeError("Could not update ciphertext")
output = buf.raw[:written.value]
# finalize
if not evp.EVP_EncryptFinal_ex(ctx, buf, ctypes.byref(final)):
raise EnvelopeError("Could not finalize ciphertext")
output += buf.raw[:final.value]
# ...and go home
return output_iv, output_key, output
def decrypt(iv, encrypted_aes_key, data, keyfile=None, key=None):
"""Decrypts the given ciphertext, raising EnvelopeError on failure.
Usage:
>>> from evpy import envelope
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> iv, key, ciphertext = envelope.encrypt(data, public_key)
>>> envelope.decrypt(iv, key, ciphertext, private_key) == data
True
"""
# build and initialize the context
ctx = evp.EVP_CIPHER_CTX_new()
if not ctx:
raise EnvelopeError("Could not create context")
evp.EVP_CIPHER_CTX_init(ctx)
# get the cipher object
cipher_object = evp.EVP_aes_192_cbc()
if not cipher_object:
raise EnvelopeError("Could not create cipher object")
# get the key from the keyfile
if key and not keyfile:
dkey = _build_dkey_from_string(key)
elif keyfile and not key:
dkey = _build_dkey_from_file(keyfile)
else:
raise EnvelopeError("Must specify exactly one key or keyfile")
# open the envelope
if not evp.EVP_OpenInit(ctx, cipher_object, encrypted_aes_key, len(encrypted_aes_key), iv, dkey):
raise EnvelopeError("Could not open envelope")
# build the output buffer
buf = ctypes.create_string_buffer(len(data) + 16)
written = ctypes.c_int(0)
final = ctypes.c_int(0)
# update
if not evp.EVP_DecryptUpdate(ctx, buf, ctypes.byref(written), data, len(data)):
raise EnvelopeError("Could not update envelope")
output = buf.raw[:written.value]
# finalize
if not evp.EVP_DecryptFinal_ex(ctx, buf, ctypes.byref(final)):
raise EnvelopeError("Could not finalize envelope")
output += buf.raw[:final.value]
return output

View file

@ -1,318 +0,0 @@
#! /usr/bin/env python
import ctypes
import ctypes.util
import platform
from os import linesep
def handle_errors():
ERR_load_crypto_strings()
errno = ERR_get_error()
errbuf = ctypes.create_string_buffer(1024)
ERR_error_string_n(errno, errbuf, 1024)
return errbuf.value.decode("ascii")
class SSLError(Exception):
def __init__(self, msg):
sslerr = handle_errors()
Exception.__init__(self, msg + linesep + sslerr)
libraries = {}
libraries["c"] = ctypes.CDLL(ctypes.util.find_library("c"))
if platform.system() != "Windows":
libraries["ssl"] = ctypes.CDLL(ctypes.util.find_library("ssl"))
else:
libraries["ssl"] = ctypes.CDLL(ctypes.util.find_library("libeay32"))
class RSA(ctypes.Structure):
_fields_ = [ ("n", ctypes.c_void_p),
("e", ctypes.c_void_p),
("d", ctypes.c_void_p),
("p", ctypes.c_void_p),
("q", ctypes.c_void_p),
("dmp1", ctypes.c_void_p),
("iqmp", ctypes.c_void_p)]
def handle_errors():
ERR_load_crypto_strings()
errno = ERR_get_error()
errbuf = ctypes.create_string_buffer(1024)
ERR_error_string_n(errno, errbuf, 1024)
return errbuf.value.decode("ascii")
fopen = libraries['c'].fopen
fopen.restype = ctypes.c_void_p
fopen.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
fclose = libraries['c'].fclose
fclose.restype = ctypes.c_int
fclose.argtypes = [ctypes.c_void_p]
ERR_load_crypto_strings = libraries['ssl'].ERR_load_crypto_strings
ERR_load_crypto_strings.restype = ctypes.c_int
ERR_load_crypto_strings.argtypes = []
ERR_print_errors_fp = libraries['ssl'].ERR_print_errors_fp
ERR_print_errors_fp.restype = ctypes.c_int
ERR_print_errors_fp.argtypes = [ctypes.c_void_p]
OpenSSL_add_all_digests = libraries['ssl'].OpenSSL_add_all_digests
OpenSSL_add_all_digests.restype = ctypes.c_int
OpenSSL_add_all_digests.argtypes = []
EVP_MD_CTX_create = libraries['ssl'].EVP_MD_CTX_create
EVP_MD_CTX_create.restype = ctypes.c_void_p
EVP_MD_CTX_create.argtypes = []
PEM_read_PrivateKey = libraries['ssl'].PEM_read_PrivateKey
PEM_read_PrivateKey.restype = ctypes.c_void_p
PEM_read_PrivateKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
PEM_read_X509 = libraries['ssl'].PEM_read_X509
PEM_read_X509.restype = ctypes.c_void_p
PEM_read_X509.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
PEM_read_PUBKEY = libraries['ssl'].PEM_read_PUBKEY
PEM_read_PUBKEY.restype = ctypes.c_void_p
PEM_read_PUBKEY.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
PEM_read_bio_PUBKEY = libraries['ssl'].PEM_read_bio_PUBKEY
PEM_read_bio_PUBKEY.restype = ctypes.c_void_p
PEM_read_bio_PUBKEY.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
BIO_free = libraries['ssl'].BIO_free
BIO_free.restype = ctypes.c_int
BIO_free.argtypes = [ctypes.c_void_p]
PEM_read_bio_PrivateKey = libraries['ssl'].PEM_read_bio_PrivateKey
PEM_read_bio_PrivateKey.restype = ctypes.c_void_p
PEM_read_bio_PrivateKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
EVP_get_digestbyname = libraries['ssl'].EVP_get_digestbyname
EVP_get_digestbyname.restype = ctypes.c_void_p
EVP_get_digestbyname.argtypes = [ctypes.c_char_p]
EVP_DigestInit = libraries['ssl'].EVP_DigestInit
EVP_DigestInit.restype = ctypes.c_int
EVP_DigestInit.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
EVP_DigestUpdate = libraries['ssl'].EVP_DigestUpdate
EVP_DigestUpdate.restype = ctypes.c_int
EVP_DigestUpdate.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int]
EVP_SignFinal = libraries['ssl'].EVP_SignFinal
EVP_SignFinal.restype = ctypes.c_int
EVP_SignFinal.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_void_p]
EVP_VerifyFinal = libraries['ssl'].EVP_VerifyFinal
EVP_VerifyFinal.restype = ctypes.c_int
EVP_VerifyFinal.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p]
EVP_PKEY_free = libraries['ssl'].EVP_PKEY_free
EVP_PKEY_free.restype = ctypes.c_int
EVP_PKEY_free.argtypes = [ctypes.c_void_p]
EVP_MD_CTX_cleanup = libraries['ssl'].EVP_MD_CTX_cleanup
EVP_MD_CTX_cleanup.restype = ctypes.c_int
EVP_MD_CTX_cleanup.argtypes = [ctypes.c_void_p]
EVP_MD_CTX_destroy = libraries['ssl'].EVP_MD_CTX_destroy
EVP_MD_CTX_destroy.restype = ctypes.c_int
EVP_MD_CTX_destroy.argtypes = [ctypes.c_void_p]
EVP_CIPHER_CTX_new = libraries['ssl'].EVP_CIPHER_CTX_new
EVP_CIPHER_CTX_new.restype = ctypes.c_void_p
EVP_CIPHER_CTX_new.argtypes = []
EVP_CIPHER_CTX_init = libraries['ssl'].EVP_CIPHER_CTX_init
EVP_CIPHER_CTX_init.restype = ctypes.c_int
EVP_CIPHER_CTX_init.argtypes = [ctypes.c_void_p]
try:
EVP_CIPHER_CTX_iv_length = libraries['ssl'].EVP_CIPHER_CTX_iv_length
EVP_CIPHER_CTX_iv_length.restype = ctypes.c_int
EVP_CIPHER_CTX_iv_length.argtypes = [ctypes.c_void_p]
except:
EVP_CIPHER_CTX_iv_length = lambda(x): 16
EVP_aes_192_cbc = libraries['ssl'].EVP_aes_192_cbc
EVP_aes_192_cbc.restype = ctypes.c_void_p
EVP_aes_192_cbc.argtypes = []
EVP_PKEY_size = libraries['ssl'].EVP_PKEY_size
EVP_PKEY_size.restype = ctypes.c_int
EVP_PKEY_size.argtypes = [ctypes.c_void_p]
EVP_SealInit = libraries['ssl'].EVP_SealInit
EVP_SealInit.restype = ctypes.c_int
EVP_SealInit.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int]
EVP_EncryptInit_ex = libraries['ssl'].EVP_EncryptInit_ex
EVP_EncryptInit_ex.restype = ctypes.c_int
EVP_EncryptInit_ex.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p]
EVP_EncryptUpdate = libraries['ssl'].EVP_EncryptUpdate
EVP_EncryptUpdate.restype = ctypes.c_int
EVP_EncryptUpdate.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_char_p, ctypes.c_int]
EVP_DecryptUpdate = libraries['ssl'].EVP_DecryptUpdate
EVP_DecryptUpdate.restype = ctypes.c_int
EVP_DecryptUpdate.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_char_p, ctypes.c_int]
EVP_EncryptFinal_ex = libraries['ssl'].EVP_EncryptFinal_ex
EVP_EncryptFinal_ex.restype = ctypes.c_int
EVP_EncryptFinal_ex.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int)]
EVP_SealFinal = libraries['ssl'].EVP_SealFinal
EVP_SealFinal.restype = ctypes.c_int
EVP_SealFinal.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int)]
EVP_DecryptFinal_ex = libraries['ssl'].EVP_DecryptFinal_ex
EVP_DecryptFinal_ex.restype = ctypes.c_int
EVP_DecryptFinal_ex.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int)]
RAND_bytes = libraries['ssl'].RAND_bytes
RAND_bytes.restype = ctypes.c_int
RAND_bytes.argtypes = [ctypes.c_char_p, ctypes.c_int]
BIO_new_mem_buf = libraries['ssl'].BIO_new_mem_buf
BIO_new_mem_buf.restype = ctypes.c_void_p
BIO_new_mem_buf.argtypes = [ctypes.c_char_p, ctypes.c_int]
EVP_CIPHER_CTX_rand_key = libraries['ssl'].EVP_CIPHER_CTX_rand_key
EVP_CIPHER_CTX_rand_key.restype = ctypes.c_int
EVP_CIPHER_CTX_rand_key.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
try:
EVP_CIPHER_key_length = libraries['ssl'].EVP_CIPHER_key_length
EVP_CIPHER_key_length.restype = ctypes.c_int
EVP_CIPHER_key_length.argtypes = [ctypes.c_void_p]
except:
EVP_CIPHER_key_length = lambda(x): 24
EVP_PKEY_encrypt = libraries['ssl'].EVP_PKEY_encrypt
EVP_PKEY_encrypt.restype = ctypes.c_int
EVP_PKEY_encrypt.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p]
EVP_OpenInit = libraries['ssl'].EVP_OpenInit
EVP_OpenInit.restype = ctypes.c_int
EVP_OpenInit.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_void_p]
EVP_PKEY_size = libraries['ssl'].EVP_PKEY_size
EVP_PKEY_size.restype = ctypes.c_int
EVP_PKEY_size.argtypes = [ctypes.c_void_p]
RSA_public_encrypt = libraries['ssl'].RSA_public_encrypt
RSA_public_encrypt.restype = ctypes.c_int
RSA_public_encrypt.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int]
EVP_PKEY_get1_RSA = libraries['ssl'].EVP_PKEY_get1_RSA
EVP_PKEY_get1_RSA.restype = ctypes.c_void_p
EVP_PKEY_get1_RSA.argtypes = [ctypes.c_void_p]
EVP_DecryptInit_ex = libraries['ssl'].EVP_DecryptInit_ex
EVP_DecryptInit_ex.restype = ctypes.c_int
EVP_DecryptInit_ex.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p]
EVP_CIPHER_CTX_set_key_length = libraries['ssl'].EVP_CIPHER_CTX_set_key_length
EVP_CIPHER_CTX_set_key_length.restype = ctypes.c_int
EVP_CIPHER_CTX_set_key_length.argtypes = [ctypes.c_void_p, ctypes.c_int]
EVP_BytesToKey = libraries['ssl'].EVP_BytesToKey
EVP_BytesToKey.restype = ctypes.c_int
EVP_BytesToKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p]
ERR_get_error = libraries['ssl'].ERR_get_error
ERR_get_error.restype = ctypes.c_long
ERR_get_error.argtypes = []
ERR_error_string_n = libraries['ssl'].ERR_error_string_n
ERR_error_string_n.restype = ctypes.c_void_p
ERR_error_string_n.argtypes = [ctypes.c_long, ctypes.c_char_p, ctypes.c_int]
RSA_size = libraries['ssl'].RSA_size
RSA_size.restype = ctypes.c_int
RSA_size.argtypes = [ctypes.c_void_p]
RSA_new = libraries['ssl'].RSA_new
RSA_new.restype = ctypes.POINTER(RSA)
RSA_generate_key = libraries['ssl'].RSA_generate_key
RSA_generate_key.restype = ctypes.POINTER(RSA)
RSA_generate_key.argtypes = [ctypes.c_int, ctypes.c_ulong, ctypes.c_void_p, ctypes.c_void_p]
RSA_print = libraries['ssl'].RSA_print
RSA_print.restype = ctypes.c_int
RSA_print.argtypes = [ctypes.c_void_p, ctypes.POINTER(RSA), ctypes.c_int]
PEM_write_bio_RSA_PUBKEY = libraries['ssl'].PEM_write_bio_RSA_PUBKEY
PEM_write_bio_RSAPrivateKey = libraries['ssl'].PEM_write_bio_RSAPrivateKey
PEM_write_bio_RSAPrivateKey.restype = ctypes.c_int
PEM_write_bio_RSAPrivateKey.argtypes = [ctypes.c_void_p, ctypes.POINTER(RSA), ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p]
PEM_write_RSAPrivateKey = libraries['ssl'].PEM_write_RSAPrivateKey
PEM_write_RSAPrivateKey.restype = ctypes.c_int
PEM_write_RSAPrivateKey.argtypes = [ctypes.c_void_p, ctypes.POINTER(RSA), ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p]
i2d_RSAPrivateKey = libraries['ssl'].i2d_RSAPrivateKey
BIO_read = libraries['ssl'].BIO_read
BIO_s_mem = libraries['ssl'].BIO_s_mem
BIO_s_mem.restype = ctypes.c_void_p
BIO_new = libraries['ssl'].BIO_new
BIO_new.restype = ctypes.c_void_p
BIO_new.argtypes = [ctypes.c_void_p]
RAND_seed = libraries['ssl'].RAND_seed
RAND_seed.argtypes = [ctypes.c_void_p, ctypes.c_int]

View file

@ -1,215 +0,0 @@
#! /usr/bin/env python
"""
signature.py
Written by Geremy Condra
Released on 18 March 2010
Licensed under MIT License
This module provides a basic interface to OpenSSL's EVP
signature functions.
All functions in this module will raise a SignatureError
in the event of a malfunction.
The goal of cryptographic signatures is to provide some
degree of assurance that the data you are processing is
both coming from the person you think is sending it and
is what they sent.
Note that this does not encrypt data in the sense that
it does not provide secrecy for it, while evpy.cipher
and evpy.envelope provide secrecy but no other security
properties.
Usage:
>>> from evpy import signature
>>> data = b"abcdefg"
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> s = signature.sign(data, private_key)
>>> signature.verify(data, s, public_key)
True
"""
import ctypes
import evp
class SignatureError(evp.SSLError):
pass
def sign(data, keyfile=None, key=None):
"""Signs the given data, raising SignatureError on failure.
Exactly one of keyfile, key should be given; if key is not
defined, then the key will be read from the given file.
Usage:
>>> from evpy import signature
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> s = signature.sign(data, private_key)
>>> signature.verify(data, s, public_key)
True
"""
# add the digests
evp.OpenSSL_add_all_digests()
# build the context
ctx = evp.EVP_MD_CTX_create()
if not ctx:
raise SignatureError("Could not create context")
# get the signing key
if key and not keyfile:
skey = _build_skey_from_string(key)
elif keyfile and not key:
skey = _build_skey_from_file(keyfile)
else:
raise SignatureError("Exactly one of key, keyfile must be specified")
# build the hash object
evp_hash = _build_hash()
if not evp.EVP_DigestInit(ctx, evp_hash):
_cleanup(skey, ctx)
raise SignatureError("Could not initialize signature")
# update
if not evp.EVP_DigestUpdate(ctx, data, len(data)):
_cleanup(skey, ctx)
raise SignatureError("Could not update signature")
# finalize
output_buflen = ctypes.c_int(evp.EVP_PKEY_size(skey))
output = ctypes.create_string_buffer(output_buflen.value)
if not evp.EVP_SignFinal(ctx, output, ctypes.byref(output_buflen), skey):
_cleanup(skey, ctx)
raise SignatureError("Could not finalize signature")
# cleanup
_cleanup(skey, ctx)
# and go home
return ctypes.string_at(output, output_buflen)
def verify(data, sig, keyfile=None, key=None):
"""Verifies the given signature, returning a boolean.
Exactly one of keyfile, key should be specified.
This function raises SignatureError on error.
Usage:
>>> from evpy import signature
>>> f = open("test/short.txt", "rb")
>>> data = f.read()
>>> public_key = "test/keys/public1.pem"
>>> private_key = "test/keys/private1.pem"
>>> s = signature.sign(data, private_key)
>>> signature.verify(data, s, public_key)
True
"""
# add the digests
evp.OpenSSL_add_all_digests()
# build the context
ctx = evp.EVP_MD_CTX_create()
if not ctx:
raise SignatureError("Could not create context")
# get the vkey
if key and not keyfile:
vkey = _build_vkey_from_string(key)
elif keyfile and not key:
vkey = _build_vkey_from_file(keyfile)
else:
raise SignatureError("Exactly one of key, keyfile must be specified")
# build the hash object
evp_hash = _build_hash()
if not evp.EVP_DigestInit(ctx, evp_hash):
_cleanup(vkey, ctx)
raise SignatureError("Could not initialize verifier")
# update
if not evp.EVP_DigestUpdate(ctx, data, len(data)):
_cleanup(vkey, ctx)
raise SignatureError("Could not update verifier")
# finalize
retcode = evp.EVP_VerifyFinal(ctx, sig, len(sig), vkey)
# cleanup
_cleanup(vkey, ctx)
# and go home
if retcode == 1:
return True
elif retcode == 0:
return False
else:
raise SignatureError("Error verifying signature")
def _cleanup(key, ctx):
evp.EVP_PKEY_free(key)
evp.EVP_MD_CTX_cleanup(ctx)
evp.EVP_MD_CTX_destroy(ctx)
def _string_to_bio(s):
return evp.BIO_new_mem_buf(s, len(s))
def _build_skey_from_file(keyfile):
fp = evp.fopen(keyfile, "r")
if not fp:
raise SignatureError("Could not open keyfile")
# get the signing key
skey = evp.PEM_read_PrivateKey(fp, None, None, None)
if not skey:
evp.fclose(fp)
raise SignatureError("Could not read signing key")
# close the file
evp.fclose(fp)
return skey
def _build_skey_from_string(key):
buf = ctypes.create_string_buffer(key)
bio = evp.BIO_new_mem_buf(buf, len(buf.value))
skey = evp.PEM_read_bio_PrivateKey(bio, None, None, None, None)
if not skey:
raise SignatureError("Could not construct signing key from the given string")
evp.BIO_free(bio)
return skey
def _build_vkey_from_file(keyfile):
fp = evp.fopen(keyfile, "r")
if not fp:
raise SignatureError("Could not open keyfile")
# get the verification key
vkey = evp.PEM_read_PUBKEY(fp, None, None, None)
if not vkey:
evp.fclose(fp)
raise SignatureError("Could not read verification key")
# close the file
evp.fclose(fp)
return vkey
def _build_vkey_from_string(key):
buf = ctypes.create_string_buffer(key)
bio = evp.BIO_new_mem_buf(buf, len(buf.value))
vkey = evp.PEM_read_bio_PUBKEY(bio, None, None, None)
if not vkey:
raise SignatureError("Could not construct verification key from the given string")
return vkey
def _build_hash():
evp_hash = evp.EVP_get_digestbyname("sha512")
if not evp_hash:
raise SignatureError("Could not create hash object")
return evp_hash

View file

@ -1,668 +0,0 @@
#! /usr/bin/env python
"""
test.py
Written by Geremy Condra
Licensed under MIT License
Released 21 March 2010
Simple unit tests for evpy
"""
import unittest
from evpy import evp
from evpy import cipher
from evpy import signature
from evpy import envelope
# locations for test data
TEST_DATA_DIR = "test/"
TEST_KEYS = TEST_DATA_DIR + "keys/"
# test text
STRING = open(TEST_DATA_DIR + "long.txt").read()
LONG = open(TEST_DATA_DIR + "long.txt", 'rb').read()
SHORT = open(TEST_DATA_DIR + "short.txt", 'rb').read()
UNICODE = open(TEST_DATA_DIR + "unicode.txt", 'rb').read()
NULL = open(TEST_DATA_DIR + "null.txt", 'rb').read()
TEST_TEXTS = { "long": LONG,
"short": SHORT,
"unicode": UNICODE,
"null": NULL}
# test keys
SHORT_SYMMETRIC = open(TEST_KEYS + "short_symmetric.txt", 'rb').read()
LONG_SYMMETRIC = open(TEST_KEYS + "long_symmetric.txt", 'rb').read()
SYMMETRIC_KEYS = [LONG_SYMMETRIC, SHORT_SYMMETRIC]
SYMMETRIC_STRING = open(TEST_KEYS + "short_symmetric.txt").read()
KEY_1 = TEST_KEYS + "private1.pem", TEST_KEYS + "public1.pem"
KEY_2 = TEST_KEYS + "private2.pem", TEST_KEYS + "public2.pem"
MISMATCH_1 = KEY_1[0], KEY_2[1]
MISMATCH_2 = KEY_2[0], KEY_1[1]
MISSING_PRIVATE_KEY = "notakey.pem", KEY_1[1]
MISSING_PUBLIC_KEY = KEY_1[0], "notakey.pem"
BLANK_PRIVATE_KEY = KEY_1[0], TEST_KEYS + "blank.pem"
BLANK_PUBLIC_KEY = TEST_KEYS + "blank.pem", KEY_1[1]
def run_n_times(f, g, n):
def err(*args, **kwargs):
if err.n > 0:
err.n -= 1
return f(*args, **kwargs)
else:
return g(*args, **kwargs)
err.n = n
return err
class TestCipher(unittest.TestCase):
def round_trip(self, key, text):
salt, iv, enc = cipher.encrypt(text, key)
output = cipher.decrypt(salt, iv, enc, key)
self.assertEqual(output, text, "Failed to round trip")
def test_round_trip_short_zero(self):
self.assertRaises(cipher.CipherError, self.round_trip, SHORT_SYMMETRIC, '')
def test_round_trip_short_long(self):
self.round_trip(SHORT_SYMMETRIC, LONG)
def test_round_trip_short_short(self):
self.round_trip(SHORT_SYMMETRIC, SHORT)
def test_round_trip_short_unicode(self):
self.round_trip(SHORT_SYMMETRIC, UNICODE)
def test_round_trip_short_null(self):
self.round_trip(SHORT_SYMMETRIC, NULL)
def test_round_trip_long_zero(self):
self.assertRaises(cipher.CipherError, self.round_trip, LONG_SYMMETRIC, '')
def test_round_trip_long_long(self):
self.round_trip(LONG_SYMMETRIC, LONG)
def test_round_trip_long_short(self):
self.round_trip(LONG_SYMMETRIC, SHORT)
def test_round_trip_long_unicode(self):
self.round_trip(LONG_SYMMETRIC, UNICODE)
def test_round_trip_long_null(self):
self.round_trip(LONG_SYMMETRIC, NULL)
def test_no_data(self):
iv, salt, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, '', SHORT_SYMMETRIC)
def test_short_salt(self):
salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, salt[:-1], iv, enc, SHORT_SYMMETRIC)
def test_long_salt(self):
salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, salt+salt[:-1], iv, enc, SHORT_SYMMETRIC)
def test_short_iv(self):
salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv[:-1], enc, SHORT_SYMMETRIC)
def test_long_iv(self):
salt, iv, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv+iv[:-1], enc, SHORT_SYMMETRIC)
def test_round_trip_no_password(self):
iv, salt, enc = cipher.encrypt(UNICODE, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.encrypt, UNICODE, '')
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, '')
def test_round_trip_failure(self):
salt, iv, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv, enc, LONG_SYMMETRIC)
salt, iv, enc = cipher.encrypt(NULL, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, salt, iv, enc, LONG_SYMMETRIC)
def test_bad_rand_bytes(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
rand_bytes = cipher.evp.RAND_bytes
cipher.evp.RAND_bytes = lambda a,b: 0
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.RAND_bytes = rand_bytes
def test_bad_rand_bytes_1(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
rand_bytes = cipher.evp.RAND_bytes
cipher.evp.RAND_bytes = lambda a,b: 0
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.RAND_bytes = rand_bytes
def test_bad_rand_bytes_2(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
rand_bytes = cipher.evp.RAND_bytes
cipher.evp.RAND_bytes = run_n_times(cipher.evp.RAND_bytes, lambda a,b:0, 1)
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.RAND_bytes = rand_bytes
def test_bad_hash_by_name(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
hash_by_name = cipher.evp.EVP_get_digestbyname
cipher.evp.EVP_get_digestbyname = lambda a: None
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.EVP_get_digestbyname = hash_by_name
def test_bad_bytes_to_key(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
bytes_to_key = cipher.evp.EVP_BytesToKey
cipher.evp.EVP_BytesToKey = lambda a,b,c,d,e,f,g,h: 0
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC)
cipher.evp.EVP_BytesToKey = bytes_to_key
def test_bad_ctx_new(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
new_ctx = cipher.evp.EVP_CIPHER_CTX_new
cipher.evp.EVP_CIPHER_CTX_new = lambda: None
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC)
cipher.evp.EVP_CIPHER_CTX_new = new_ctx
def test_bad_cipher_object(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
cipher_getter = cipher.evp.EVP_aes_192_cbc
cipher.evp.EVP_aes_192_cbc= lambda: None
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC)
cipher.evp.EVP_aes_192_cbc = cipher_getter
def test_bad_encrypt_init_1(self):
encrypt_init = cipher.evp.EVP_EncryptInit_ex
cipher.evp.EVP_EncryptInit_ex = lambda a,b,c,d,e: None
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.EVP_EncryptInit_ex = encrypt_init
def test_bad_encrypt_init_2(self):
encrypt_init = cipher.evp.EVP_EncryptInit_ex
cipher.evp.EVP_EncryptInit_ex = run_n_times(cipher.evp.EVP_EncryptInit_ex, lambda a,b,c,d,e: None, 1)
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.EVP_EncryptInit_ex = encrypt_init
def test_bad_encrypt_update(self):
encrypt_update = cipher.evp.EVP_EncryptUpdate
cipher.evp.EVP_EncryptUpdate = lambda a,b,c,d,e: None
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.EVP_EncryptUpdate = encrypt_update
def test_bad_encrypt_final(self):
encrypt_final = cipher.evp.EVP_EncryptFinal_ex
cipher.evp.EVP_EncryptFinal_ex = lambda a,b,c: None
self.assertRaises(cipher.CipherError, cipher.encrypt, SHORT, SHORT_SYMMETRIC)
cipher.evp.EVP_EncryptFinal_ex = encrypt_final
def test_bad_decrypt_init(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
decrypt_init = cipher.evp.EVP_DecryptInit_ex
cipher.evp.EVP_DecryptInit_ex = lambda a,b,c,d,e: None
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC)
cipher.evp.EVP_DecryptInit_ex = decrypt_init
def test_bad_decrypt_update(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
decrypt_update = cipher.evp.EVP_DecryptUpdate
cipher.evp.EVP_DecryptUpdate = lambda a,b,c,d,e: None
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC)
cipher.evp.EVP_DecryptUpdate = decrypt_update
def test_bad_decrypt_final(self):
iv, salt, enc = cipher.encrypt(SHORT, SHORT_SYMMETRIC)
decrypt_final = cipher.evp.EVP_DecryptFinal_ex
cipher.evp.EVP_DecryptFinal_ex = lambda a,b,c: None
self.assertRaises(cipher.CipherError, cipher.decrypt, iv, salt, enc, SHORT_SYMMETRIC)
cipher.evp.EVP_DecryptFinal_ex = decrypt_final
class TestSignature(unittest.TestCase):
def round_trip(self, keys, text):
s = signature.sign(text, keys[0])
return signature.verify(text, s, keys[1])
def round_trip_strings(self, keys, text):
s = signature.sign(text, key=open(keys[0], 'rb').read())
v = signature.verify(text, s, key=open(keys[1], 'rb').read())
return v
def round_trip_all_keys(self, text):
self.assertTrue(self.round_trip(KEY_1, text))
self.assertTrue(self.round_trip(KEY_2, text))
self.assertFalse(self.round_trip(MISMATCH_1, text))
self.assertFalse(self.round_trip(MISMATCH_2, text))
self.assertTrue(self.round_trip_strings(KEY_1, text))
self.assertTrue(self.round_trip_strings(KEY_2, text))
self.assertFalse(self.round_trip_strings(MISMATCH_1, text))
self.assertFalse(self.round_trip_strings(MISMATCH_2, text))
def test_round_trip_long(self):
self.round_trip_all_keys(LONG)
def test_round_trip_short(self):
self.round_trip_all_keys(SHORT)
def test_round_trip_unicode(self):
self.round_trip_all_keys(UNICODE)
def test_round_trip_null(self):
self.round_trip_all_keys(NULL)
def test_round_trip_zero(self):
self.round_trip_all_keys('')
def test_arguments(self):
text = SHORT
keys = KEY_1
self.failUnlessRaises(signature.SignatureError, signature.sign, text, key=open(keys[0], 'rb').read(), keyfile=keys[0])
self.failUnlessRaises(signature.SignatureError, signature.sign, text)
s = signature.sign(text, keyfile=keys[0])
self.failUnlessRaises(signature.SignatureError, signature.verify, text, s, key=open(keys[1], 'rb').read(), keyfile=keys[1])
self.failUnlessRaises(signature.SignatureError, signature.verify, text, s)
def test_bad_ctx(self):
s = signature.sign(SHORT, KEY_1[0])
new_ctx = signature.evp.EVP_MD_CTX_create
signature.evp.EVP_MD_CTX_create = lambda: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0])
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1])
signature.evp.EVP_MD_CTX_create = new_ctx
def test_bad_hash(self):
s = signature.sign(SHORT, KEY_1[0])
new_hash = signature.evp.EVP_get_digestbyname
signature.evp.EVP_get_digestbyname = lambda a: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0])
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1])
signature.evp.EVP_get_digestbyname = new_hash
def test_bad_digest_init(self):
s = signature.sign(SHORT, KEY_1[0])
init = signature.evp.EVP_DigestInit
signature.evp.EVP_DigestInit = lambda a,b: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0])
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1])
signature.evp.EVP_DigestInit = init
def test_bad_digest_update(self):
s = signature.sign(SHORT, KEY_1[0])
update = signature.evp.EVP_DigestUpdate
signature.evp.EVP_DigestUpdate = lambda a,b,c: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0])
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1])
signature.evp.EVP_DigestUpdate = update
def test_bad_sign_final(self):
final = signature.evp.EVP_SignFinal
signature.evp.EVP_SignFinal = lambda a,b,c,d: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0])
signature.evp.EVP_SignFinal = final
def test_bad_verify_final(self):
s = signature.sign(SHORT, KEY_1[0])
final = signature.evp.EVP_VerifyFinal
signature.evp.EVP_VerifyFinal = lambda a,b,c,d: None
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1])
signature.evp.EVP_VerifyFinal = final
def test_bad_read_privatekey(self):
read_private_key = signature.evp.PEM_read_PrivateKey
signature.evp.PEM_read_PrivateKey = lambda a,b,c,d: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[0])
signature.evp.PEM_read_PrivateKey = read_private_key
def test_bad_read_publickey(self):
s = signature.sign(SHORT, KEY_1[0])
read_public_key = signature.evp.PEM_read_PUBKEY
signature.evp.PEM_read_PUBKEY = lambda a,b,c,d: None
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[1])
signature.evp.PEM_read_PUBKEY = read_public_key
def test_bad_read_bio_privatekey(self):
read_private_key = signature.evp.PEM_read_bio_PrivateKey
signature.evp.PEM_read_bio_PrivateKey = lambda a,b,c,d,e: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, key=open(KEY_1[0], 'rb').read())
signature.evp.PEM_read_bio_PrivateKey = read_private_key
def test_bad_read_bio_publickey(self):
s = signature.sign(SHORT, KEY_1[0])
read_public_key = signature.evp.PEM_read_bio_PUBKEY
signature.evp.PEM_read_bio_PUBKEY = lambda a,b,c,d: None
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, key=open(KEY_1[1], 'rb').read())
signature.evp.PEM_read_bio_PUBKEY = read_public_key
def test_bad_fopen(self):
s = signature.sign(SHORT, KEY_1[0])
file_open = signature.evp.fopen
signature.evp.fopen = lambda a,b: None
self.assertRaises(signature.SignatureError, signature.sign, SHORT, KEY_1[1])
self.assertRaises(signature.SignatureError, signature.verify, SHORT, s, KEY_1[0])
signature.evp.fopen = file_open
class TestEnvelope(unittest.TestCase):
def round_trip(self, keys, text):
iv, sym_key, enc = envelope.encrypt(text, keys[1])
return envelope.decrypt(iv, sym_key, enc, keys[0])
def round_trip_strings(self, keys, text):
iv, sym_key, enc = envelope.encrypt(text, key=open(keys[1], 'rb').read())
return envelope.decrypt(iv, sym_key, enc, key=open(keys[0], 'rb').read())
def test_round_trip_zero(self):
self.assertRaises(envelope.EnvelopeError, self.round_trip, KEY_1, '')
self.assertRaises(envelope.EnvelopeError, self.round_trip, KEY_2, '')
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, '')
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, '')
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, KEY_1, '')
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, KEY_2, '')
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, '')
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, '')
def test_round_trip_long(self):
self.assertEqual(self.round_trip_strings(KEY_1, LONG), LONG, "Failed to round trip")
self.assertEqual(self.round_trip_strings(KEY_2, LONG), LONG, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, LONG)
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, LONG)
self.assertEqual(self.round_trip(KEY_1, LONG), LONG, "Failed to round trip")
self.assertEqual(self.round_trip(KEY_2, LONG), LONG, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, LONG)
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, LONG)
def test_round_trip_short(self):
self.assertEqual(self.round_trip(KEY_1, SHORT), SHORT, "Failed to round trip")
self.assertEqual(self.round_trip(KEY_2, SHORT), SHORT, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, SHORT)
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, SHORT)
self.assertEqual(self.round_trip_strings(KEY_1, SHORT), SHORT, "Failed to round trip")
self.assertEqual(self.round_trip_strings(KEY_2, SHORT), SHORT, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, SHORT)
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, SHORT)
def test_round_trip_unicode(self):
self.assertEqual(self.round_trip(KEY_1, UNICODE), UNICODE, "Failed to round trip")
self.assertEqual(self.round_trip(KEY_2, UNICODE), UNICODE, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, UNICODE)
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, UNICODE)
self.assertEqual(self.round_trip_strings(KEY_1, UNICODE), UNICODE, "Failed to round trip")
self.assertEqual(self.round_trip_strings(KEY_2, UNICODE), UNICODE, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, UNICODE)
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, UNICODE)
def test_round_trip_null(self):
self.assertEqual(self.round_trip(KEY_1, NULL), NULL, "Failed to round trip")
self.assertEqual(self.round_trip(KEY_2, NULL), NULL, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_1, NULL)
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISMATCH_2, NULL)
self.assertEqual(self.round_trip_strings(KEY_1, NULL), NULL, "Failed to round trip")
self.assertEqual(self.round_trip_strings(KEY_2, NULL), NULL, "Failed to round trip")
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_1, NULL)
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, MISMATCH_2, NULL)
def test_bad_keys(self):
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISSING_PUBLIC_KEY, SHORT)
self.assertRaises(envelope.EnvelopeError, self.round_trip, MISSING_PRIVATE_KEY, SHORT)
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, BLANK_PUBLIC_KEY, SHORT)
self.assertRaises(envelope.EnvelopeError, self.round_trip_strings, BLANK_PRIVATE_KEY, SHORT)
def test_bad_call(self):
# neither key nor keyfile
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT)
# both
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[0], open(KEY_1[0], 'rb').read())
# string key instead of bytes
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, key=open(KEY_1[0], 'r').read())
# string data instead of bytes
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, str(SHORT), KEY_1[0], open(KEY_1[0], 'rb').read())
# get valid encryption data
iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1])
# neither key nor keyfile
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc)
# both
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[1], key=open(KEY_1[1], 'rb').read())
# string key instead of bytes
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, key=open(KEY_1[1], 'r').read())
# string data instead of bytes
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, str(enc), KEY_1[1], key=open(KEY_1[1], 'rb').read())
# string iv instead of bytes
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, str(iv), aes_key, enc, KEY_1[1], key=open(KEY_1[1], 'rb').read())
# string key instead of bytes
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, str(aes_key), enc, KEY_1[1], key=open(KEY_1[1], 'rb').read())
def test_bad_rand_bytes_1(self):
rand_bytes = envelope.evp.RAND_bytes
envelope.evp.RAND_bytes = lambda a,b:0
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.RAND_bytes = rand_bytes
def test_bad_rand_bytes_2(self):
rand_bytes = envelope.evp.RAND_bytes
envelope.evp.RAND_bytes = run_n_times(evp.RAND_bytes, lambda a,b:0, 1)
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.RAND_bytes = rand_bytes
def test_bad_rsa_size(self):
rsa_size = envelope.evp.RSA_size
envelope.evp.RSA_size = lambda a:0
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.RSA_size = rsa_size
def test_bad_read_privatekey(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
read_private_key = envelope.evp.PEM_read_PrivateKey
envelope.evp.PEM_read_PrivateKey = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.PEM_read_PrivateKey = read_private_key
def test_bad_read_publickey(self):
read_public_key = envelope.evp.PEM_read_PUBKEY
envelope.evp.PEM_read_PUBKEY = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.PEM_read_PUBKEY = read_public_key
def test_bad_read_bio_privatekey(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
read_private_key = envelope.evp.PEM_read_bio_PrivateKey
envelope.evp.PEM_read_bio_PrivateKey = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, key=open(KEY_1[0], 'rb').read())
envelope.evp.PEM_read_bio_PrivateKey = read_private_key
def test_bad_read_bio_publickey(self):
read_public_key = envelope.evp.PEM_read_bio_PUBKEY
envelope.evp.PEM_read_bio_PUBKEY = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, key=open(KEY_1[0], 'rb').read())
envelope.evp.PEM_read_bio_PUBKEY = read_public_key
def test_bad_fopen(self):
iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1])
file_open = envelope.evp.fopen
envelope.evp.fopen = lambda a,b: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0])
envelope.evp.fopen = file_open
def test_bad_ctx_new(self):
iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1])
new_ctx = envelope.evp.EVP_CIPHER_CTX_new
envelope.evp.EVP_CIPHER_CTX_new = lambda: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0])
envelope.evp.EVP_CIPHER_CTX_new = new_ctx
def test_bad_cipher_object(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
cipher_getter = envelope.evp.EVP_aes_192_cbc
envelope.evp.EVP_aes_192_cbc= lambda: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.EVP_aes_192_cbc = cipher_getter
def test_bad_encrypt_init_1(self):
encrypt_init = envelope.evp.EVP_EncryptInit_ex
envelope.evp.EVP_EncryptInit_ex = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_EncryptInit_ex = encrypt_init
def test_bad_encrypt_init_2(self):
encrypt_init = envelope.evp.EVP_EncryptInit_ex
envelope.evp.EVP_EncryptInit_ex = run_n_times(envelope.evp.EVP_EncryptInit_ex, lambda a,b,c,d,e: None, 1)
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_EncryptInit_ex = encrypt_init
def test_bad_encrypt_update(self):
encrypt_update = envelope.evp.EVP_EncryptUpdate
envelope.evp.EVP_EncryptUpdate = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_EncryptUpdate = encrypt_update
def test_bad_encrypt_final(self):
encrypt_final = envelope.evp.EVP_EncryptFinal_ex
envelope.evp.EVP_EncryptFinal_ex = lambda a,b,c: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_EncryptFinal_ex = encrypt_final
def test_bad_open_init(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
decrypt_init = envelope.evp.EVP_OpenInit
envelope.evp.EVP_OpenInit = lambda a,b,c,d,e,f: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.EVP_OpenInit = decrypt_init
def test_bad_decrypt_update(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
decrypt_update = envelope.evp.EVP_DecryptUpdate
envelope.evp.EVP_DecryptUpdate = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.EVP_DecryptUpdate = decrypt_update
def test_bad_decrypt_final(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
decrypt_final = envelope.evp.EVP_DecryptFinal_ex
envelope.evp.EVP_DecryptFinal_ex = lambda a,b,c: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
cipher.evp.EVP_DecryptFinal_ex = decrypt_final
def test_bad_rsa_get(self):
get_rsa = envelope.evp.EVP_PKEY_get1_RSA
envelope.evp.EVP_PKEY_get1_RSA = lambda a: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_PKEY_get1_RSA = get_rsa
def test_bad_rsa_encrypt(self):
encrypt_rsa = envelope.evp.RSA_public_encrypt
envelope.evp.RSA_public_encrypt = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.RSA_public_encrypt = encrypt_rsa
def test_bad_read_privatekey(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
read_private_key = envelope.evp.PEM_read_PrivateKey
envelope.evp.PEM_read_PrivateKey = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.PEM_read_PrivateKey = read_private_key
def test_bad_read_publickey(self):
read_public_key = envelope.evp.PEM_read_PUBKEY
envelope.evp.PEM_read_PUBKEY = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.PEM_read_PUBKEY = read_public_key
def test_bad_read_bio_privatekey(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
read_private_key = envelope.evp.PEM_read_bio_PrivateKey
envelope.evp.PEM_read_bio_PrivateKey = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, key=open(KEY_1[0], 'rb').read())
envelope.evp.PEM_read_bio_PrivateKey = read_private_key
def test_bad_read_bio_publickey(self):
read_public_key = envelope.evp.PEM_read_bio_PUBKEY
envelope.evp.PEM_read_bio_PUBKEY = lambda a,b,c,d: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, key=open(KEY_1[0], 'rb').read())
envelope.evp.PEM_read_bio_PUBKEY = read_public_key
def test_bad_fopen(self):
iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1])
file_open = envelope.evp.fopen
envelope.evp.fopen = lambda a,b: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0])
envelope.evp.fopen = file_open
def test_bad_ctx_new(self):
iv, aes_key, enc = envelope.encrypt(SHORT, KEY_1[1])
new_ctx = envelope.evp.EVP_CIPHER_CTX_new
envelope.evp.EVP_CIPHER_CTX_new = lambda: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, aes_key, enc, KEY_1[0])
envelope.evp.EVP_CIPHER_CTX_new = new_ctx
def test_bad_cipher_object(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
cipher_getter = envelope.evp.EVP_aes_192_cbc
envelope.evp.EVP_aes_192_cbc= lambda: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.EVP_aes_192_cbc = cipher_getter
def test_bad_encrypt_init(self):
encrypt_init = envelope.evp.EVP_EncryptInit_ex
envelope.evp.EVP_EncryptInit_ex = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_EncryptInit_ex = encrypt_init
def test_bad_encrypt_update(self):
encrypt_update = envelope.evp.EVP_EncryptUpdate
envelope.evp.EVP_EncryptUpdate = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_EncryptUpdate = encrypt_update
def test_bad_encrypt_final(self):
encrypt_final = envelope.evp.EVP_EncryptFinal_ex
envelope.evp.EVP_EncryptFinal_ex = lambda a,b,c: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_EncryptFinal_ex = encrypt_final
def test_bad_open_init(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
decrypt_init = envelope.evp.EVP_OpenInit
envelope.evp.EVP_OpenInit = lambda a,b,c,d,e,f: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.EVP_OpenInit = decrypt_init
def test_bad_decrypt_update(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
decrypt_update = envelope.evp.EVP_DecryptUpdate
envelope.evp.EVP_DecryptUpdate = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
envelope.evp.EVP_DecryptUpdate = decrypt_update
def test_bad_decrypt_final(self):
iv, key, enc = envelope.encrypt(SHORT, KEY_1[1])
decrypt_final = envelope.evp.EVP_DecryptFinal_ex
envelope.evp.EVP_DecryptFinal_ex = lambda a,b,c: None
self.assertRaises(envelope.EnvelopeError, envelope.decrypt, iv, key, enc, KEY_1[0])
cipher.evp.EVP_DecryptFinal_ex = decrypt_final
def test_bad_rsa_get(self):
get_rsa = envelope.evp.EVP_PKEY_get1_RSA
envelope.evp.EVP_PKEY_get1_RSA = lambda a: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.EVP_PKEY_get1_RSA = get_rsa
def test_bad_rsa_encrypt(self):
encrypt_rsa = envelope.evp.RSA_public_encrypt
envelope.evp.RSA_public_encrypt = lambda a,b,c,d,e: None
self.assertRaises(envelope.EnvelopeError, envelope.encrypt, SHORT, KEY_1[1])
envelope.evp.RSA_public_encrypt = encrypt_rsa
if __name__ == "__main__":
unittest.main()

View file

@ -58,7 +58,7 @@
"""
from distutils.core import setup
from setuptools import setup
setup(
name='tuf',
@ -67,8 +67,8 @@
author='https://www.updateframework.com',
author_email='info@updateframework.com',
url='https://www.updateframework.com',
install_requires=['pycrypto>=2.6'],
packages=[
'evpy',
'tuf',
'tuf.client',
'tuf.compatibility',
@ -76,8 +76,7 @@
'tuf.pushtools',
'tuf.pushtools.transfer',
'tuf.repo',
'tuf.tests',
'tuf.tests.system_tests'
'tuf.tests'
],
scripts=[
'tuf/repo/quickstart.py',

View file

@ -33,10 +33,10 @@
import shutil
import urllib
import tempfile
import util_test_tools
import tuf
from tuf.interposition import urllib_tuf
from tuf.tests import util_test_tools

View file

@ -30,7 +30,7 @@
import tuf.repo.keystore as keystore
import tuf.repo.signercli as signercli
import tuf.repo.signerlib as signerlib
import util_test_tools
from tuf.tests import util_test_tools
version = 1
# Modify the number of iterations (from the higher default count) so the unit

View file

@ -34,14 +34,13 @@
from __future__ import print_function
import os
import shutil
import urllib
import tempfile
import util_test_tools
import tuf
from tuf.interposition import urllib_tuf
from tuf.log import logger
from tuf.tests import util_test_tools
class EndlessDataAttack(Exception):
pass
@ -79,12 +78,17 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
downloads = os.path.join(root_repo, 'downloads')
tuf_targets = os.path.join(tuf_repo, 'targets')
# Original data.
INTENDED_DATA = 'Test A'
# Add a file to 'repo' directory: {root_repo}
filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test A')
filepath = util_test_tools.add_file_to_repository(reg_repo, INTENDED_DATA)
file_basename = os.path.basename(filepath)
url_to_repo = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
endless_data = 'A'*100000
# We do not deliver truly endless data, but we will extend the original
# file by many bytes.
noisy_data = 'X'*100000
if TUF:
@ -99,55 +103,57 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
# Attacker modifies the file at the targets repository.
target = os.path.join(tuf_targets, file_basename)
util_test_tools.modify_file_at_repository(target, endless_data)
original_data = util_test_tools.read_file_content(target)
larger_original_data = original_data + noisy_data
util_test_tools.modify_file_at_repository(target, larger_original_data)
# Attacker modifies the timestamp.txt metadata.
if TIMESTAMP:
metadata = os.path.join(tuf_repo, 'metadata')
timestamp = os.path.join(metadata, 'timestamp.txt')
# FIXME: This does not correctly "patch" the timestamp metadata.
util_test_tools.modify_file_at_repository(timestamp, endless_data)
original_data = util_test_tools.read_file_content(timestamp)
larger_original_data = original_data + noisy_data
util_test_tools.modify_file_at_repository(timestamp,
larger_original_data)
# Attacker modifies the file at the regular repository.
util_test_tools.modify_file_at_repository(filepath, endless_data)
original_data = util_test_tools.read_file_content(filepath)
larger_original_data = original_data + noisy_data
util_test_tools.modify_file_at_repository(filepath, larger_original_data)
# End Setup.
# Client downloads (tries to download) the file.
try:
# Client downloads (tries to download) the file.
_download(url=url_to_repo, filename=downloaded_file, TUF=TUF)
except Exception, exception:
# Because we are extending the true timestamp TUF metadata with invalid
# JSON, we except to catch an error about invalid metadata JSON.
if TUF and TIMESTAMP:
endless_data_attack = False
except tuf.NoWorkingMirrorError, exception:
endless_data_attack = False
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
if isinstance(mirror_error, tuf.InvalidMetadataJSONError):
endless_data_attack = True
break
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
# We would get a bad hash error if the file was actually larger than
# the metadata said it was.
if isinstance(mirror_error, tuf.BadHashError):
endless_data_attack = True
break
# We would get invalid metadata JSON if the server deliberately sent
# malformed JSON as part of an endless data attack.
elif isinstance(mirror_error, tuf.InvalidMetadataJSONError):
endless_data_attack = True
break
# In case we did not detect what was likely an endless data attack, we
# reraise the exception to indicate that endless data attack detection
# failed.
if not endless_data_attack: raise
else: raise
# In case we did not detect what was likely an endless data attack, we
# reraise the exception to indicate that endless data attack detection
# failed.
if not endless_data_attack:
raise
else:
# When we test downloading "endless" timestamp with TUF, we want to skip
# the following test because downloading the timestamp should have failed.
if not (TUF and TIMESTAMP):
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test A'. Technically it suffices
# to check whether the file was downloaded or not.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
if 'Test A' != downloaded_content:
if downloaded_content != INTENDED_DATA:
raise EndlessDataAttack(ERROR_MSG)
finally:
util_test_tools.cleanup(root_repo, server_proc)
@ -157,27 +163,26 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
try:
test_arbitrary_package_attack(TUF=False, TIMESTAMP=False)
except EndlessDataAttack, error:
print('Without TUF: '+str(error))
print('Endless data attack worked on download without TUF!')
try:
test_arbitrary_package_attack(TUF=True, TIMESTAMP=False)
except EndlessDataAttack, error:
print('With TUF: '+str(error))
print('Endless data attack worked on download without TUF!')
print(str(error))
else:
print('Endless data attack did not work on download with TUF!')
try:
# This test fails because the timestamp metadata has been extended with
# random data from its true length, thereby resulting in invalid JSON.
test_arbitrary_package_attack(TUF=True, TIMESTAMP=True)
except EndlessDataAttack, error:
print('With TUF: '+str(error))
print('Endless data attack worked on download without TUF!')
print(str(error))
else:
print('Endless data attack did not work on download with TUF!')

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
"""
<Program Name>
test_extraneous_dependencies_attack.py
@ -42,7 +44,7 @@
import tuf
import tuf.interposition
import util_test_tools
from tuf.tests import util_test_tools
class ExtraneousDependencyAlert(Exception):

View file

@ -25,12 +25,12 @@
import shutil
import urllib
import tempfile
import util_test_tools
import tuf
import tuf.formats
import tuf.repo.signerlib as signerlib
from tuf.interposition import urllib_tuf
from tuf.tests import util_test_tools
class IndefiniteFreezeAttackAlert(Exception):

View file

@ -40,8 +40,8 @@
import tempfile
import tuf
import util_test_tools
from tuf.interposition import urllib_tuf
from tuf.tests import util_test_tools
class MixAndMatchAttackAlert(Exception):

View file

@ -38,8 +38,8 @@
import urllib
import tempfile
import tuf.tests.system_tests.util_test_tools as util_test_tools
from tuf.interposition import urllib_tuf
from tuf.tests import util_test_tools
class TestSetupError(Exception):

View file

@ -49,8 +49,8 @@
import urllib
import tuf.tests.system_tests.util_test_tools as util_test_tools
from tuf.interposition import urllib_tuf
from tuf.tests import util_test_tools
class SlowRetrievalAttackAlert(Exception):

View file

@ -28,7 +28,7 @@
import tuf.repo.keystore as keystore
import tuf.repo.signerlib as signerlib
import tuf.repo.signercli as signercli
import tuf.tests.unittest_toolbox as unittest_toolbox
import unittest_toolbox as unittest_toolbox

View file

@ -37,7 +37,7 @@
import tuf.conf as conf
import tuf.download as download
import tuf.log
import tuf.tests.unittest_toolbox as unittest_toolbox
import unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_download')
@ -110,20 +110,17 @@ def test_download_url_to_tempfileobj_and_lengths(self):
download.safe_download(self.url, self.target_data_length - 1)
download.unsafe_download(self.url, self.target_data_length - 1)
# NOTE: We catch tuf.DownloadError here because the STRICT_REQUIRED_LENGTH,
# which is True by default, mandates that we must download exactly what is
# required.
exception_message = 'Downloaded '+str(self.target_data_length)+\
' bytes, but expected '+\
str(self.target_data_length+1)+\
' bytes. There is a difference of 1 bytes!'
self.assertRaisesRegexp(tuf.DownloadError, exception_message,
download.safe_download, self.url,
self.target_data_length + 1)
# NOTE: We catch tuf.DownloadLengthMismatchError here because the
# STRICT_REQUIRED_LENGTH, which is True by default, mandates that we must
# download exactly what is required.
self.assertRaises(tuf.DownloadLengthMismatchError, download.safe_download,
self.url, self.target_data_length + 1)
# NOTE: However, we do not catch a tuf.DownloadError here for the same test
# as the previous one because we have disabled STRICT_REQUIRED_LENGTH.
temp_fileobj = download.unsafe_download(self.url, self.target_data_length + 1)
# NOTE: However, we do not catch a tuf.DownloadLengthMismatchError here for
# the same test as the previous one because we have disabled
# STRICT_REQUIRED_LENGTH.
temp_fileobj = download.unsafe_download(self.url,
self.target_data_length + 1)
self.assertEquals(self.target_data, temp_fileobj.read())
self.assertEquals(self.target_data_length, len(temp_fileobj.read()))
temp_fileobj.close_temp_file()

View file

@ -21,15 +21,15 @@
import tuf
import tuf.formats as formats
import tuf.mirrors as mirrors
import tuf.tests.unittest_toolbox
import unittest_toolbox
class TestMirrors(tuf.tests.unittest_toolbox.Modified_TestCase):
class TestMirrors(unittest_toolbox.Modified_TestCase):
def setUp(self):
tuf.tests.unittest_toolbox.Modified_TestCase.setUp(self)
unittest_toolbox.Modified_TestCase.setUp(self)
self.mirrors = \
{'mirror1': {'url_prefix' : 'http://mirror1.com',

View file

@ -28,7 +28,7 @@
import tuf.pushtools.push as push
import tuf.pushtools.transfer.scp as scp
import tuf.pushtools.pushtoolslib as pushtoolslib
import tuf.tests.system_tests.util_test_tools as util_test_tools
import tuf.tests.util_test_tools as util_test_tools
logger = logging.getLogger('tuf.test_push')

View file

@ -30,10 +30,10 @@
import tuf.log
import tuf.repo.quickstart as quickstart
import tuf.util
import tuf.tests.unittest_toolbox
import unittest_toolbox
logger = logging.getLogger('tuf.test_quickstart')
unit_tbox = tuf.tests.unittest_toolbox.Modified_TestCase
unit_tbox = unittest_toolbox.Modified_TestCase
logger.info('from test_quickstart')

View file

@ -52,7 +52,7 @@ class guarantees the order of unit tests. So that, 'test_something_A'
import tuf.repo.signercli as signercli
# Helper module unittest_toolbox.py
import tuf.tests.unittest_toolbox as unittest_toolbox
import unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_signercli')

View file

@ -61,12 +61,12 @@
import tuf.repo.signerlib as signerlib
import tuf.repo.keystore
import tuf.tests.unittest_toolbox
import unittest_toolbox
logger = logging.getLogger('tuf.test_signerlib')
# 'unittest_toolbox.Modified_TestCase' is too long, I'll set it to 'unit_tbox'.
unit_tbox = tuf.tests.unittest_toolbox.Modified_TestCase
unit_tbox = unittest_toolbox.Modified_TestCase

View file

@ -53,8 +53,8 @@ class guarantees the order of unit tests. So that, 'test_something_A'
import tuf.repo.keystore as keystore
import tuf.repo.signerlib as signerlib
import tuf.roledb
import tuf.tests.repository_setup as setup
import tuf.tests.unittest_toolbox as unittest_toolbox
import repository_setup as setup
import unittest_toolbox as unittest_toolbox
import tuf.util
logger = logging.getLogger('tuf.test_updater')
@ -67,6 +67,8 @@ class guarantees the order of unit tests. So that, 'test_something_A'
'length': tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
}
original_safe_download = tuf.download.safe_download
original_unsafe_download = tuf.download.unsafe_download
class TestUpdater_init_(unittest_toolbox.Modified_TestCase):
@ -404,6 +406,27 @@ def test_1__rebuild_key_and_role_db(self):
def test_1__update_fileinfo(self):
# Tests
# Verify that fileinfo dictionary is empty.
self.assertFalse(self.Repository.fileinfo)
# Load file info for top level roles. This populates the fileinfo
# dictionary.
for role in self.role_list:
self.Repository._update_fileinfo(role+'.txt')
# Verify that fileinfo has been populated and contains appropriate data.
self.assertTrue(self.Repository.fileinfo)
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':role_info[0], 'hashes':role_info[1]}
self.assertTrue(role+'.txt' in self.Repository.fileinfo.keys())
self.assertEqual(self.Repository.fileinfo[role+'.txt'], role_info_dict)
def test_2__import_delegations(self):
@ -485,6 +508,86 @@ def test_2__ensure_all_targets_allowed(self):
def test_2__fileinfo_has_changed(self):
# Verify that the method returns 'False' if file info was not changed.
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':role_info[0], 'hashes':role_info[1]}
self.assertFalse(self.Repository._fileinfo_has_changed(role+'.txt',
role_info_dict))
# Verify that the method returns 'True' if length or hashes were changed.
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':8, 'hashes':role_info[1]}
self.assertTrue(self.Repository._fileinfo_has_changed(role+'.txt',
role_info_dict))
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':role_info[0],
'hashes':{'sha256':self.random_string()}}
self.assertTrue(self.Repository._fileinfo_has_changed(role+'.txt',
role_info_dict))
def test_2__move_current_to_previous(self):
# The test will consist of removing a metadata file from client's
# {client_repository}/metadata/previous directory, executing the method
# and then verifying that the 'previous' directory contains
# the release file.
release_meta_path = os.path.join(self.client_previous_dir, 'release.txt')
os.remove(release_meta_path)
self.assertFalse(os.path.exists(release_meta_path))
self.Repository._move_current_to_previous('release')
self.assertTrue(os.path.exists(release_meta_path))
shutil.copy(release_meta_path, self.client_current_dir)
def test_2__delete_metadata(self):
# This test will verify that 'root' metadata is never deleted, when
# role is deleted verify that the file is not present in the
# self.Repository.metadata dictionary.
self.Repository._delete_metadata('root')
self.assertTrue('root' in self.Repository.metadata['current'])
self.Repository._delete_metadata('timestamp')
self.assertFalse('timestamp' in self.Repository.metadata['current'])
timestamp_meta_path = os.path.join(self.client_previous_dir,
'timestamp.txt')
shutil.copy(timestamp_meta_path, self.client_current_dir)
def test_2__ensure_not_expired(self):
# This test condition will verify that nothing is raised when a metadata
# file has a future expiration date.
self.Repository._ensure_not_expired('root')
# 'tuf.ExpiredMetadataError' should be raised in this next test condition,
# because the expiration_date has expired by 10 seconds.
expires = tuf.formats.format_time(time.time() - 10)
self.Repository.metadata['current']['root']['expires'] = expires
# Ensure the 'expires' field of the root file is properly formatted.
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(self.Repository.metadata\
['current']['root']))
self.assertRaises(tuf.ExpiredMetadataError,
self.Repository._ensure_not_expired, 'root')
def test_3__update_metadata(self):
"""
This unit test verifies the method's proper behaviour on the expected input.
@ -520,8 +623,9 @@ def test_3__update_metadata(self):
# Test: normal case.
# Patch 'download.download_url_to_tempfileobj' function.
self._mock_download_url_to_tempfileobj(self.targets_filepath)
_update_metadata('targets',
signerlib.get_metadata_file_info(self.targets_filepath))
uncompressed_fileinfo = \
signerlib.get_metadata_file_info(self.targets_filepath)
_update_metadata('targets', uncompressed_fileinfo)
list_of_targets = self.Repository.metadata['current']['targets']['targets']
# Verify that the added target's path is listed in target's metadata.
@ -532,18 +636,24 @@ def test_3__update_metadata(self):
# Test: normal case, compressed metadata file.
# Add a file to targets directory and rebuild targets metadata.
added_target_2 = self._add_target_to_targets_dir(targets_keyids)
uncompressed_fileinfo = \
signerlib.get_metadata_file_info(self.targets_filepath)
# To test compressed file handling, compress targets metadata file.
targets_filepath_compressed = self._compress_file(self.targets_filepath)
targets_filepath_compressed = self._compress_file(self.targets_filepath)
compressed_fileinfo = \
signerlib.get_metadata_file_info(targets_filepath_compressed)
# Re-patch 'download.download_url_to_tempfileobj' function.
self._mock_download_url_to_tempfileobj(targets_filepath_compressed)
# TODO: Not convinced this is actually being tested correctly.
# See how we get fileinfo in tuf.client.updater._update_metadata_if_changed
_update_metadata('targets',
#signerlib.get_metadata_file_info(self.targets_filepath),
None,
compression='gzip')
# The length (but not the hash) passed to this function is incorrect. The
# length must be that of the compressed file, whereas the hash must be that
# of the uncompressed file.
mixed_fileinfo = {
'length': compressed_fileinfo['length'],
'hashes': uncompressed_fileinfo['hashes']
}
_update_metadata('targets', mixed_fileinfo, compression='gzip')
list_of_targets = self.Repository.metadata['current']['targets']['targets']
# Verify that the added target's path is listed in target's metadata.
@ -558,56 +668,6 @@ def test_3__update_metadata(self):
def test_1__update_fileinfo(self):
# Tests
# Verify that fileinfo dictionary is empty.
self.assertFalse(self.Repository.fileinfo)
# Load file info for top level roles. This populates the fileinfo
# dictionary.
for role in self.role_list:
self.Repository._update_fileinfo(role+'.txt')
# Verify that fileinfo has been populated and contains appropriate data.
self.assertTrue(self.Repository.fileinfo)
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':role_info[0], 'hashes':role_info[1]}
self.assertTrue(role+'.txt' in self.Repository.fileinfo.keys())
self.assertEqual(self.Repository.fileinfo[role+'.txt'], role_info_dict)
def test_2__fileinfo_has_changed(self):
# Verify that the method returns 'False' if file info was not changed.
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':role_info[0], 'hashes':role_info[1]}
self.assertFalse(self.Repository._fileinfo_has_changed(role+'.txt',
role_info_dict))
# Verify that the method returns 'True' if length or hashes were changed.
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':8, 'hashes':role_info[1]}
self.assertTrue(self.Repository._fileinfo_has_changed(role+'.txt',
role_info_dict))
for role in self.role_list:
role_filepath = os.path.join(self.client_current_dir, role+'.txt')
role_info = tuf.util.get_file_details(role_filepath)
role_info_dict = {'length':role_info[0],
'hashes':{'sha256':self.random_string()}}
self.assertTrue(self.Repository._fileinfo_has_changed(role+'.txt',
role_info_dict))
def test_3__update_metadata_if_changed(self):
@ -696,12 +756,12 @@ def test_3__update_metadata_if_changed(self):
# Patch 'download.download_url_to_tempfileobj' and update targets.
self._mock_download_url_to_tempfileobj(self.root_filepath)
# TODO: Is this the original intent of this test?
# FIXME: What is the original intent of this test?
try:
update_if_changed('targets')
except tuf.NoWorkingMirrorError, exception:
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
assert isinstance(mirror_error, tuf.BadHashError)
assert isinstance(mirror_error, tuf.DownloadLengthMismatchError)
# Restoring repositories to the initial state.
os.remove(release_filepath_compressed)
@ -711,53 +771,23 @@ def test_3__update_metadata_if_changed(self):
def test_2__move_current_to_previous(self):
# The test will consist of removing a metadata file from client's
# {client_repository}/metadata/previous directory, executing the method
# and then verifying that the 'previous' directory contains
# the release file.
release_meta_path = os.path.join(self.client_previous_dir, 'release.txt')
os.remove(release_meta_path)
self.assertFalse(os.path.exists(release_meta_path))
self.Repository._move_current_to_previous('release')
self.assertTrue(os.path.exists(release_meta_path))
shutil.copy(release_meta_path, self.client_current_dir)
def test_3__targets_of_role(self):
# Setup
targets_dir_content = os.listdir(self.targets_dir)
def test_2__delete_metadata(self):
# This test will verify that 'root' metadata is never deleted, when
# role is deleted verify that the file is not present in the
# self.Repository.metadata dictionary.
self.Repository._delete_metadata('root')
self.assertTrue('root' in self.Repository.metadata['current'])
self.Repository._delete_metadata('timestamp')
self.assertFalse('timestamp' in self.Repository.metadata['current'])
timestamp_meta_path = os.path.join(self.client_previous_dir,
'timestamp.txt')
shutil.copy(timestamp_meta_path, self.client_current_dir)
def test_2__ensure_not_expired(self):
# This test condition will verify that nothing is raised when a metadata
# file has a future expiration date.
self.Repository._ensure_not_expired('root')
# Test: normal case.
targets_list = self.Repository._targets_of_role('targets')
# 'tuf.ExpiredMetadataError' should be raised in this next test condition,
# because the expiration_date has expired by 10 seconds.
expires = tuf.formats.format_time(time.time() - 10)
self.Repository.metadata['current']['root']['expires'] = expires
# Ensure the 'expires' field of the root file is properly formatted.
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(self.Repository.metadata\
['current']['root']))
self.assertRaises(tuf.ExpiredMetadataError,
self.Repository._ensure_not_expired, 'root')
# Verify that list of targets was returned,
# and that it contains valid target file.
self.assertTrue(tuf.formats.TARGETFILES_SCHEMA.matches(targets_list))
targets_filepaths = []
for target in range(len(targets_list)):
targets_filepaths.append(targets_list[target]['filepath'])
for dir_target in targets_dir_content:
if dir_target.endswith('.txt'):
self.assertTrue(dir_target in targets_filepaths)
@ -857,27 +887,6 @@ def test_4__refresh_targets_metadata(self):
def test_3__targets_of_role(self):
# Setup
targets_dir_content = os.listdir(self.targets_dir)
# Test: normal case.
targets_list = self.Repository._targets_of_role('targets')
# Verify that list of targets was returned,
# and that it contains valid target file.
self.assertTrue(tuf.formats.TARGETFILES_SCHEMA.matches(targets_list))
targets_filepaths = []
for target in range(len(targets_list)):
targets_filepaths.append(targets_list[target]['filepath'])
for dir_target in targets_dir_content:
if dir_target.endswith('.txt'):
self.assertTrue(dir_target in targets_filepaths)
def test_5_all_targets(self):
@ -1138,6 +1147,8 @@ def tearDownModule():
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
setup.remove_all_repositories(TestUpdater.repositories['main_repository'])
unittest_toolbox.Modified_TestCase.clear_toolbox()
tuf.download.safe_download = original_safe_download
tuf.download.unsafe_download = original_unsafe_download
if __name__ == '__main__':
unittest.main()

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
"""
<Program Name>
test_util.py
@ -28,7 +30,7 @@
import tuf.log
import tuf.hash
import tuf.util as util
import tuf.tests.unittest_toolbox as unittest_toolbox
import unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_util')
@ -37,7 +39,6 @@ class TestUtil(unittest_toolbox.Modified_TestCase):
def setUp(self):
unittest_toolbox.Modified_TestCase.setUp(self)
util.tempfile.TemporaryFile = tempfile.TemporaryFile
self.temp_fileobj = util.TempFile()
@ -71,10 +72,10 @@ def _extract_tempfile_directory(self, config_temp_dir=None):
# Patching 'tempfile.TemporaryFile()' (by substituting
# temfile.TemporaryFile() with tempfile.mkstemp()) in order to get the
# directory of the stored tempfile object.
saved_tempfile_TemporaryFile = util.tempfile.TemporaryFile
util.tempfile.TemporaryFile = tempfile.mkstemp
saved_tempfile_TemporaryFile = util.tempfile.NamedTemporaryFile
util.tempfile.NamedTemporaryFile = tempfile.mkstemp
_temp_fileobj = util.TempFile()
util.tempfile.TemporaryFile = saved_tempfile_TemporaryFile
util.tempfile.NamedTemporaryFile = saved_tempfile_TemporaryFile
junk, _tempfilepath = _temp_fileobj.temporary_file
_tempfile_dir = os.path.dirname(_tempfilepath)

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
"""
<Program Name>
test_util_test_tools.py
@ -18,7 +20,8 @@
import os
import urllib
import unittest
import util_test_tools
from tuf.tests import util_test_tools

View file

@ -192,7 +192,14 @@ class DownloadError(Error):
class DownloadLengthMismatchError(DownloadError):
"""Indicate that a mismatch of lengths was seen while downloading a file."""
pass
def __init__(self, expected_length, observed_length):
self.expected_length = expected_length #bytes
self.observed_length = observed_length #bytes
def __str__(self):
return 'Observed length ('+str(self.observed_length)+\
') <= expected length ('+str(self.expected_length)+')'

View file

@ -592,17 +592,17 @@ def refresh(self):
def __check_hashes(self, input_file, trusted_hashes):
def __check_hashes(self, file_object, trusted_hashes):
"""
<Purpose>
A helper function that verifies multiple secure hashes of the downloaded
file. If any of these fail it raises an exception. This is to conform
with the TUF specs, which support clients with different hashing
algorithms. The 'hash.py' module is used to compute the hashes of the
'input_file'.
'file_object'.
<Arguments>
input_file:
file_object:
A file-like object.
trusted_hashes:
@ -624,7 +624,7 @@ def __check_hashes(self, input_file, trusted_hashes):
# any of the hashes are incorrect and return if all are correct.
for algorithm, trusted_hash in trusted_hashes.items():
digest_object = tuf.hash.digest(algorithm)
digest_object.update(input_file.read())
digest_object.update(file_object.read())
computed_hash = digest_object.hexdigest()
if trusted_hash != computed_hash:
raise tuf.BadHashError('Hashes do not match! Expected '+
@ -636,6 +636,74 @@ def __check_hashes(self, input_file, trusted_hashes):
def __hard_check_length(self, file_object, trusted_length):
"""
<Purpose>
A helper function that checks the expected length of a file-like object.
The length of the file must be strictly equal to the expected length.
This is a deliberately redundant implementation designed to complement
tuf.download._check_downloaded_length().
<Arguments>
file_object:
A file-like object.
trusted_length:
A nonnegative integer that is the expected length of the file.
<Exceptions>
tuf.DownloadLengthMismatchError, if the lengths don't match.
<Side Effects>
None.
<Returns>
None.
"""
observed_length = len(file_object)
if observed_length != trusted_length:
raise tuf.DownloadLengthMismatchError(trusted_length, observed_length)
def __soft_check_length(self, file_object, trusted_length):
"""
<Purpose>
A helper function that checks the expected length of a file-like object.
The length of the file must be less than or equal to the expected length.
This is a deliberately redundant implementation designed to complement
tuf.download._check_downloaded_length().
<Arguments>
file_object:
A file-like object.
trusted_length:
A nonnegative integer that is the expected length of the file.
<Exceptions>
tuf.DownloadLengthMismatchError, if the lengths don't match.
<Side Effects>
None.
<Returns>
None.
"""
observed_length = len(file_object)
if observed_length > trusted_length:
raise tuf.DownloadLengthMismatchError(trusted_length, observed_length)
def get_target_file(self, target_filepath, file_length, file_hashes):
"""
<Purpose>
@ -667,21 +735,25 @@ def get_target_file(self, target_filepath, file_length, file_hashes):
"""
def verify_target_file(target_file_object):
# Every target file must have its hashes inspected.
def verify_decompressed_target_file(target_file_object):
# Every target file must have its length and hashes inspected.
self.__hard_check_length(target_file_object, file_length)
self.__check_hashes(target_file_object, file_hashes)
return self.__get_file(target_filepath, verify_target_file, 'target',
file_length, download_safely=True, compression=None)
return self.__get_file(target_filepath, verify_decompressed_target_file,
'target', file_length, download_safely=True,
compression=None)
def __verify_metadata_file(self, metadata_file_object, metadata_role):
def __verify_decompressed_metadata_file(self, metadata_file_object,
metadata_role):
"""
<Purpose>
A private helpe function to verify a downloaded metadata file.
A private helper function to verify a decompressed downloaded metadata
file.
<Arguments>
metadata_file_object:
@ -787,11 +859,14 @@ def unsafely_get_metadata_file(self, metadata_role, metadata_filepath,
"""
def unsafely_verify_metadata_file(metadata_file_object):
self.__verify_metadata_file(metadata_file_object, metadata_role)
def unsafely_verify_decompressed_metadata_file(metadata_file_object):
self.__soft_check_length(metadata_file_object, file_length)
self.__verify_decompressed_metadata_file(metadata_file_object,
metadata_role)
return self.__get_file(metadata_filepath, unsafely_verify_metadata_file,
'meta', file_length, download_safely=False,
return self.__get_file(metadata_filepath,
unsafely_verify_decompressed_metadata_file, 'meta',
file_length, download_safely=False,
compression=None)
@ -836,12 +911,15 @@ def safely_get_metadata_file(self, metadata_role, metadata_filepath,
"""
def safely_verify_metadata_file(metadata_file_object):
def safely_verify_decompressed_metadata_file(metadata_file_object):
self.__hard_check_length(metadata_file_object, file_length)
self.__check_hashes(metadata_file_object, file_hashes)
self.__verify_metadata_file(metadata_file_object, metadata_role)
self.__verify_decompressed_metadata_file(metadata_file_object,
metadata_role)
return self.__get_file(metadata_filepath, safely_verify_metadata_file,
'meta', file_length, download_safely=True,
return self.__get_file(metadata_filepath,
safely_verify_decompressed_metadata_file, 'meta',
file_length, download_safely=True,
compression=compression)
@ -851,7 +929,7 @@ def safely_verify_metadata_file(metadata_file_object):
# TODO: Instead of the more fragile 'download_safely' switch, unroll the
# function into two separate ones: one for "safe" download, and the other one
# for "unsafe" download? This should induce safer and more readable code.
def __get_file(self, filepath, verify_file, file_type,
def __get_file(self, filepath, verify_decompressed_file, file_type,
file_length, download_safely, compression):
"""
<Purpose>
@ -863,9 +941,9 @@ def __get_file(self, filepath, verify_file, file_type,
filepath:
The relative metadata or target filepath.
verify_file:
A function which expects a file-like object and which will raise an
exception in case the file is not valid for any reason.
verify_decompressed_file:
A function which expects a decompressed file-like object and which will
raise an exception in case the file is not valid for any reason.
file_type:
Type of data needed for download, must correspond to one of the strings
@ -911,9 +989,12 @@ def __get_file(self, filepath, verify_file, file_type,
file_object = tuf.download.unsafe_download(file_mirror, file_length)
if compression:
logger.debug('Decompressing '+str(file_mirror))
file_object.decompress_temp_file_object(compression)
else:
logger.debug('Not decompressing '+str(file_mirror))
verify_file(file_object)
verify_decompressed_file(file_object)
except Exception, exception:
# Remember the error from this mirror, and "reset" the target file.
@ -2155,10 +2236,10 @@ def _visit_child_role(self, child_role, target_filepath):
Ensure that we explore only delegated roles trusted with the target. We
assume conservation of delegated paths in the complete tree of
delegations. Note that the call to _ensure_all_targets_allowed in
__verify_metadata_file should already ensure that all targets metadata is
valid; i.e. that the targets signed by a delegatee is a proper subset of
the targets delegated to it by the delegator. Nevertheless, we check it
again here for performance and safety reasons.
__verify_decompressed_metadata_file should already ensure that all
targets metadata is valid; i.e. that the targets signed by a delegatee is
a proper subset of the targets delegated to it by the delegator.
Nevertheless, we check it again here for performance and safety reasons.
TODO: Should the TUF spec restrict the repository to one particular
algorithm? Should we allow the repository to specify in the role

View file

@ -29,7 +29,7 @@
import logging
import os.path
import socket
import time
import timeit
import tuf
import tuf.conf
@ -77,8 +77,11 @@ def __init__(self, sock, mode='rb', bufsize=-1, close=False):
# Count the number of bytes received with this socket.
self.__number_of_bytes_received = 0
# Count the seconds spent receiving with this socket.
self.__seconds_spent_receiving = 0
# Count the seconds spent receiving with this socket. Tolerate servers with
# a slow start by ignoring their delivery speed for
# tuf.conf.SLOW_START_GRACE_PERIOD seconds.
assert tuf.conf.SLOW_START_GRACE_PERIOD > 0
self.__seconds_spent_receiving = -tuf.conf.SLOW_START_GRACE_PERIOD
# Remember the time a clock was started.
self.__start_time = None
@ -108,8 +111,8 @@ def __start_clock(self):
# We must have reset the clock before this.
assert self.__start_time is None
# We are using wall time, so it will be imprecise sometimes.
self.__start_time = time.time()
# We use (platform-specific) wall time, so it will be imprecise sometimes.
self.__start_time = timeit.default_timer()
@ -139,8 +142,8 @@ def __stop_clock_and_check_speed(self, data_length):
"""
# We are using wall time, so it will be imprecise sometimes.
stop_time = time.time()
# We use (platform-specific) wall time, so it will be imprecise sometimes.
stop_time = timeit.default_timer()
# We must have already started the clock.
assert self.__start_time > 0
time_delta = stop_time-self.__start_time
@ -150,22 +153,24 @@ def __stop_clock_and_check_speed(self, data_length):
# Measure the average download speed.
self.__number_of_bytes_received += data_length
self.__seconds_spent_receiving += time_delta
average_download_speed = \
self.__number_of_bytes_received/self.__seconds_spent_receiving
# If the average download speed is below a certain threshold, we flag this
# as a possible slow-retrieval attack. This threshold will determine our
# bias: if it is too low, we will have more false positives; if it is too
# high, we will have more false negatives.
if average_download_speed < tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED:
if self.__seconds_spent_receiving <= tuf.conf.SLOW_START_GRACE_PERIOD:
logger.debug('Slow average download speed: '+\
str(average_download_speed)+' bytes/second')
if self.__seconds_spent_receiving > 0:
average_download_speed = \
self.__number_of_bytes_received/self.__seconds_spent_receiving
# If the average download speed is below a certain threshold, we flag this
# as a possible slow-retrieval attack. This threshold will determine our
# bias: if it is too low, we will have more false positives; if it is too
# high, we will have more false negatives.
if average_download_speed < tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED:
raise tuf.SlowRetrievalError(average_download_speed)
else:
raise tuf.SlowRetrievalError(average_download_speed)
logger.debug('Good average download speed: '+\
str(average_download_speed)+' bytes/second')
else:
logger.debug('Good average download speed: '+\
str(average_download_speed)+' bytes/second')
logger.debug('Ignoring average download speed for another: '+\
str(-self.__seconds_spent_receiving)+' seconds')
@ -642,7 +647,7 @@ def _check_downloaded_length(total_downloaded, required_length,
if STRICT_REQUIRED_LENGTH:
# This must be due to a programming error, and must never happen!
logger.error(message)
raise tuf.DownloadLengthMismatchError(message)
raise tuf.DownloadLengthMismatchError(required_length, total_downloaded)
else:
# We specifically disabled strict checking of required length, but we
# will log a warning anyway. This is useful when we wish to download the

View file

@ -20,7 +20,6 @@ class Logger(object):
"""A static logging object for tuf.interposition."""
tuf.log.add_console_handler()
__logger = logging.getLogger("tuf.interposition")

4
tuf/tests/__init__.py Executable file → Normal file
View file

@ -0,0 +1,4 @@
# The tuf.tests package is intended to export as little utility as possible to
# enable easier testing of TUF.
__all__ = ['util_test_tools']

View file

@ -1 +0,0 @@

View file

@ -52,7 +52,7 @@ class TempFile(object):
def _default_temporary_directory(self, prefix):
"""__init__ helper."""
try:
self.temporary_file = tempfile.TemporaryFile(prefix=prefix)
self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix)
except OSError, err:
logger.critical('Temp file in '+temp_dir+'failed: '+repr(err))
raise tuf.Error(err)
@ -66,7 +66,7 @@ def __init__(self, prefix='tuf_temp_'):
<Arguments>
prefix:
A string argument to be used with tempfile.TemporaryFile function.
A string argument to be used with tempfile.NamedTemporaryFile function.
<Exceptions>
tuf.Error on failure to load temp dir.
@ -82,7 +82,8 @@ def __init__(self, prefix='tuf_temp_'):
temp_dir = tuf.conf.temporary_directory
if temp_dir is not None and isinstance(temp_dir, str):
try:
self.temporary_file = tempfile.TemporaryFile(prefix=prefix, dir=temp_dir)
self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix,
dir=temp_dir)
except OSError, err:
logger.error('Temp file in '+temp_dir+' failed: '+repr(err))
logger.error('Will attempt to use system default temp dir.')
@ -92,6 +93,26 @@ def __init__(self, prefix='tuf_temp_'):
def __len__(self):
"""
<Purpose>
Initializes TempFile.
<Arguments>
None.
<Exceptions>
OSError.
<Return>
Nonnegative integer representing file size.
"""
return os.stat(self.temporary_file.name).st_size
def flush(self):
"""
<Purpose>