mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #266 from vladimir-v-diaz/code_coverage
Improve Code Coverage
This commit is contained in:
commit
e034be2687
17 changed files with 396 additions and 89 deletions
52
tests/test_init.py
Executable file
52
tests/test_init.py
Executable file
|
|
@ -0,0 +1,52 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_init.py
|
||||
|
||||
<Author>
|
||||
Vladimir Diaz
|
||||
|
||||
<Started>
|
||||
March 30, 2015.
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Test cases for __init__.py (mainly the exceptions defined there).
|
||||
"""
|
||||
|
||||
# Help with Python 3 compatibility, where the print statement is a function, an
|
||||
# implicit relative import is invalid, and the '/' operator performs true
|
||||
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
import tuf
|
||||
|
||||
logger = logging.getLogger('tuf.test_keys')
|
||||
|
||||
class TestInit(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
|
||||
def test_bad_signature_error(self):
|
||||
bad_signature_error = tuf.BadSignatureError('bad_role')
|
||||
logger.error(bad_signature_error)
|
||||
|
||||
|
||||
def test_slow_retrieval_error(self):
|
||||
slow_signature_error = tuf.SlowRetrievalError('bad_role')
|
||||
logger.error(slow_signature_error)
|
||||
|
||||
|
||||
# Run the unit tests.
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -182,17 +182,15 @@ def test_create_keydb_from_root_metadata(self):
|
|||
keyid = KEYS[0]['keyid']
|
||||
rsakey2 = KEYS[1]
|
||||
keyid2 = KEYS[1]['keyid']
|
||||
keydict = {keyid: rsakey, keyid2: rsakey2, keyid: rsakey}
|
||||
|
||||
keydict = {keyid: rsakey, keyid2: rsakey2}
|
||||
|
||||
# Add a duplicate 'keyid' to log/trigger a 'tuf.KeyAlreadyExistsError'
|
||||
# block (loading continues).
|
||||
roledict = {'Root': {'keyids': [keyid], 'threshold': 1},
|
||||
'Targets': {'keyids': [keyid2], 'threshold': 1}}
|
||||
'Targets': {'keyids': [keyid2, keyid], 'threshold': 1}}
|
||||
version = 8
|
||||
consistent_snapshot = False
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
|
||||
tuf.keydb.add_key(rsakey)
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version,
|
||||
expires,
|
||||
keydict, roledict,
|
||||
|
|
@ -231,7 +229,7 @@ def test_create_keydb_from_root_metadata(self):
|
|||
rsakey3['keytype'] = 'bad_keytype'
|
||||
keydict[keyid3] = rsakey3
|
||||
version = 8
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version,
|
||||
expires,
|
||||
|
|
|
|||
|
|
@ -235,8 +235,19 @@ def test_verify_signature(self):
|
|||
|
||||
# Passing incorrect number of arguments.
|
||||
self.assertRaises(TypeError, KEYS.verify_signature)
|
||||
|
||||
|
||||
|
||||
# Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is
|
||||
# unavailable) is executed in tuf.keys.verify_signature().
|
||||
KEYS._ED25519_CRYPTO_LIBRARY = 'invalid'
|
||||
KEYS._available_crypto_libraries = ['invalid']
|
||||
verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA)
|
||||
self.assertTrue(verified, "Incorrect signature.")
|
||||
|
||||
# Reset to the expected available crypto libraries.
|
||||
KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl'
|
||||
KEYS._available_crypto_libraries = ['ed25519', 'pycrypto', 'pynacl']
|
||||
|
||||
|
||||
|
||||
def test_create_rsa_encrypted_pem(self):
|
||||
# Test valid arguments.
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ def test_add_console_handler(self):
|
|||
raise TypeError('Test exception output in the console.')
|
||||
|
||||
except TypeError as e:
|
||||
logger.error(e)
|
||||
logger.exception(e)
|
||||
|
||||
|
||||
def test_remove_console_handler(self):
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ def test_generate_rsa_public_and_private(self):
|
|||
|
||||
def test_create_rsa_signature(self):
|
||||
global private_rsa
|
||||
global public_rsa
|
||||
data = 'The quick brown fox jumps over the lazy dog'.encode('utf-8')
|
||||
signature, method = pycrypto.create_rsa_signature(private_rsa, data)
|
||||
|
||||
|
|
@ -83,9 +84,15 @@ def test_create_rsa_signature(self):
|
|||
pycrypto.create_rsa_signature, '', data)
|
||||
|
||||
# Check for invalid 'data'.
|
||||
pycrypto.create_rsa_signature(private_rsa, '')
|
||||
|
||||
self.assertRaises(tuf.CryptoError,
|
||||
pycrypto.create_rsa_signature, private_rsa, 123)
|
||||
|
||||
# Check for missing private key.
|
||||
self.assertRaises(tuf.CryptoError,
|
||||
pycrypto.create_rsa_signature, public_rsa, data)
|
||||
|
||||
|
||||
def test_verify_rsa_signature(self):
|
||||
global public_rsa
|
||||
|
|
@ -209,6 +216,7 @@ def test_create_rsa_public_and_private_from_encrypted_pem(self):
|
|||
self.assertRaises(tuf.FormatError,
|
||||
pycrypto.create_rsa_public_and_private_from_encrypted_pem,
|
||||
pem_rsakey, ['pw'])
|
||||
|
||||
self.assertRaises(tuf.CryptoError,
|
||||
pycrypto.create_rsa_public_and_private_from_encrypted_pem,
|
||||
'invalid_pem', passphrase)
|
||||
|
|
|
|||
|
|
@ -647,16 +647,27 @@ def test_sign_metadata(self):
|
|||
|
||||
root_private_keypath = os.path.join(keystore_path, 'root_key')
|
||||
root_private_key = \
|
||||
repo_lib.import_rsa_privatekey_from_file(root_private_keypath,
|
||||
'password')
|
||||
repo_lib.import_rsa_privatekey_from_file(root_private_keypath, 'password')
|
||||
|
||||
# Sign with a valid, but not a threshold, key.
|
||||
targets_private_keypath = os.path.join(keystore_path, 'targets_key')
|
||||
targets_private_key = \
|
||||
repo_lib.import_rsa_privatekey_from_file(targets_private_keypath,
|
||||
'password')
|
||||
|
||||
# sign_metadata() expects the private key 'root_metadata' to be in
|
||||
# 'tuf.keydb'. Remove any public keys that may be loaded before
|
||||
# adding private key, otherwise a 'tuf.KeyAlreadyExists' exception is
|
||||
# raised.
|
||||
tuf.keydb.remove_key(root_private_key['keyid'])
|
||||
tuf.keydb.add_key(root_private_key)
|
||||
|
||||
tuf.keydb.remove_key(targets_private_key['keyid'])
|
||||
tuf.keydb.add_key(targets_private_key)
|
||||
|
||||
root_keyids.extend(tuf.roledb.get_role_keyids('targets'))
|
||||
# Add the snapshot's public key (to test whether non-private keys are
|
||||
# ignored by sign_metadata()).
|
||||
root_keyids.extend(tuf.roledb.get_role_keyids('snapshot'))
|
||||
root_signable = repo_lib.sign_metadata(root_metadata, root_keyids,
|
||||
root_filename)
|
||||
self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable))
|
||||
|
|
@ -757,6 +768,85 @@ def test__check_directory(self):
|
|||
|
||||
|
||||
|
||||
def test__generate_and_write_metadata(self):
|
||||
# Test for invalid, or unsupported, rolename.
|
||||
# Load the root metadata provided in 'tuf/tests/repository_data/'.
|
||||
root_filepath = os.path.join('repository_data', 'repository',
|
||||
'metadata', 'root.json')
|
||||
root_signable = tuf.util.load_json_file(root_filepath)
|
||||
|
||||
# _generate_and_write_metadata() expects the top-level roles
|
||||
# (specifically 'snapshot') and keys to be available in 'tuf.roledb'.
|
||||
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'])
|
||||
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
||||
targets_directory = os.path.join(temporary_directory, 'targets')
|
||||
os.mkdir(targets_directory)
|
||||
repository_directory = os.path.join(temporary_directory, 'repository')
|
||||
metadata_directory = os.path.join(repository_directory,
|
||||
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
|
||||
targets_metadata = os.path.join('repository_data', 'repository', 'metadata',
|
||||
'targets.json')
|
||||
obsolete_metadata = os.path.join(metadata_directory, 'targets',
|
||||
'obsolete_role.json')
|
||||
tuf.util.ensure_parent_dir(obsolete_metadata)
|
||||
shutil.copyfile(targets_metadata, obsolete_metadata)
|
||||
|
||||
# Test for an invalid, or unsupported, rolename.
|
||||
roleinfo = {'keyids': ['123'], 'threshold': 1}
|
||||
tuf.roledb.add_role('bad_rolename', roleinfo)
|
||||
self.assertRaises(tuf.Error,
|
||||
tuf.repository_lib._generate_and_write_metadata,
|
||||
'bad_rolename', 'bad_rolename.json', False,
|
||||
targets_directory, metadata_directory)
|
||||
|
||||
# Verify that obsolete metadata (a metadata file exists on disk, but the
|
||||
# role is unavailable in 'tuf.roledb'). First add the obsolete
|
||||
# role to 'tuf.roledb' so that its metadata file can be written to disk.
|
||||
targets_roleinfo = tuf.roledb.get_roleinfo('targets')
|
||||
targets_roleinfo['version'] = 1
|
||||
expiration = \
|
||||
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
|
||||
expiration = expiration.isoformat() + 'Z'
|
||||
targets_roleinfo['expires'] = expiration
|
||||
tuf.roledb.add_role('targets/obsolete_role', targets_roleinfo)
|
||||
|
||||
snapshot_filepath = os.path.join('repository_data', 'repository',
|
||||
'metadata', 'snapshot.json')
|
||||
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
|
||||
tuf.roledb.remove_role('targets/obsolete_role')
|
||||
self.assertTrue(os.path.exists(os.path.join(metadata_directory,
|
||||
'targets/obsolete_role.json')))
|
||||
tuf.repository_lib._delete_obsolete_metadata(metadata_directory,
|
||||
snapshot_signable['signed'],
|
||||
False)
|
||||
self.assertFalse(os.path.exists(metadata_directory + 'targets/obsolete_role.json'))
|
||||
|
||||
|
||||
|
||||
def test__remove_invalid_and_duplicate_signatures(self):
|
||||
# Remove duplicate PSS signatures (same key generates valid, but different
|
||||
# signatures). First load a valid signable (in this case, the root role).
|
||||
root_filepath = os.path.join('repository_data', 'repository',
|
||||
'metadata', 'root.json')
|
||||
root_signable = tuf.util.load_json_file(root_filepath)
|
||||
key_filepath = os.path.join('repository_data', 'keystore', 'root_key')
|
||||
root_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath,
|
||||
'password')
|
||||
|
||||
# Append the new valid, but duplicate PSS signature, and test that
|
||||
# duplicates are removed.
|
||||
new_pss_signature = tuf.keys.create_signature(root_rsa_key,
|
||||
root_signable['signed'])
|
||||
root_signable['signatures'].append(new_pss_signature)
|
||||
expected_number_of_signatures = len(root_signable['signatures'])
|
||||
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable)
|
||||
self.assertEqual(len(root_signable), expected_number_of_signatures)
|
||||
|
||||
# Test that invalid keyid are ignored.
|
||||
root_signable['signatures'][0]['keyid'] = '404'
|
||||
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable)
|
||||
|
||||
|
||||
# Run the test cases.
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ def test_init(self):
|
|||
def test_write_and_write_partial(self):
|
||||
# Test creation of a TUF repository.
|
||||
#
|
||||
# 1. Load public and private keys.
|
||||
# 1. Import public and private keys.
|
||||
# 2. Add verification keys.
|
||||
# 3. Load signing keys.
|
||||
# 4. Add target files.
|
||||
|
|
@ -245,26 +245,37 @@ def test_write_and_write_partial(self):
|
|||
repository.status()
|
||||
|
||||
# Verify status() does not raise 'tuf.InsufficientKeysError' if a top-level
|
||||
# role does not contain a threshold of keys.
|
||||
# role does and 'targets/role1' do not contain a threshold of keys.
|
||||
root_roleinfo = tuf.roledb.get_roleinfo('root')
|
||||
old_threshold = root_roleinfo['threshold']
|
||||
old_threshold = root_roleinfo['threshold']
|
||||
root_roleinfo['threshold'] = 10
|
||||
role1_roleinfo = tuf.roledb.get_roleinfo('targets/role1')
|
||||
old_role1_threshold = role1_roleinfo['threshold']
|
||||
role1_roleinfo['threshold'] = 10
|
||||
tuf.roledb.update_roleinfo('root', root_roleinfo)
|
||||
tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo)
|
||||
repository.status()
|
||||
|
||||
# Restore the original threshold value.
|
||||
# Restore the original threshold values.
|
||||
root_roleinfo['threshold'] = old_threshold
|
||||
tuf.roledb.update_roleinfo('root', root_roleinfo)
|
||||
|
||||
role1_roleinfo['threshold'] = old_role1_threshold
|
||||
tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo)
|
||||
|
||||
|
||||
# Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the
|
||||
# the top-level roles are improperly signed.
|
||||
# the top-level roles and 'targets/role1' are improperly signed.
|
||||
repository.root.unload_signing_key(root_privkey)
|
||||
repository.root.load_signing_key(targets_privkey)
|
||||
repository.targets('role1').unload_signing_key(role1_privkey)
|
||||
repository.targets('role1').load_signing_key(targets_privkey)
|
||||
repository.status()
|
||||
|
||||
# Reset Root and verify Targets.
|
||||
# Reset Root and 'targets/role1', and verify Targets.
|
||||
repository.root.unload_signing_key(targets_privkey)
|
||||
repository.root.load_signing_key(root_privkey)
|
||||
repository.targets('role1').unload_signing_key(targets_privkey)
|
||||
repository.targets('role1').load_signing_key(role1_privkey)
|
||||
repository.targets.unload_signing_key(targets_privkey)
|
||||
repository.targets.load_signing_key(snapshot_privkey)
|
||||
repository.status()
|
||||
|
|
@ -987,8 +998,11 @@ def test_add_targets(self):
|
|||
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
||||
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')
|
||||
target3_filepath = os.path.join(self.targets_directory, 'file3.txt')
|
||||
|
||||
target_files = [target1_filepath, target2_filepath, target3_filepath]
|
||||
|
||||
# Add a 'target1_filepath' duplicate for testing purposes
|
||||
# ('target1_filepath' should not be added twice.)
|
||||
target_files = \
|
||||
[target1_filepath, target2_filepath, target3_filepath, target1_filepath]
|
||||
self.targets_object.add_targets(target_files)
|
||||
|
||||
self.assertEqual(len(self.targets_object.target_files), 3)
|
||||
|
|
@ -997,17 +1011,19 @@ def test_add_targets(self):
|
|||
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(tuf.FormatError, self.targets_object.add_targets,
|
||||
3)
|
||||
|
||||
self.assertRaises(tuf.FormatError, self.targets_object.add_targets, 3)
|
||||
|
||||
# Test invalid filepath argument (i.e., non-existent or invalid file.)
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
||||
['non-existent.txt'])
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
||||
[target1_filepath, target2_filepath, 'non-existent.txt'])
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
||||
self.temporary_directory)
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
||||
[self.temporary_directory])
|
||||
temp_directory = os.path.join(self.targets_directory, 'temp')
|
||||
os.mkdir(temp_directory)
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
||||
[temp_directory])
|
||||
|
||||
|
||||
|
||||
|
|
@ -1032,6 +1048,10 @@ def test_remove_target(self):
|
|||
# Test invalid filepath argument (i.e., non-existent or invalid file.)
|
||||
self.assertRaises(tuf.Error, self.targets_object.remove_target,
|
||||
'/non-existent.txt')
|
||||
# Test for filepath that hasn't been added yet.
|
||||
target5_filepath = os.path.join(self.targets_directory, 'file5.txt')
|
||||
self.assertRaises(tuf.Error, self.targets_object.remove_target,
|
||||
target5_filepath)
|
||||
|
||||
|
||||
|
||||
|
|
@ -1076,6 +1096,25 @@ def test_delegate(self):
|
|||
|
||||
self.assertEqual(self.targets_object.get_delegated_rolenames(),
|
||||
['targets/tuf'])
|
||||
|
||||
# Try to delegate to a role that has already been delegated.
|
||||
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
|
||||
public_keys, list_of_targets, threshold, backtrack=True,
|
||||
restricted_paths=restricted_paths,
|
||||
path_hash_prefixes=path_hash_prefixes)
|
||||
|
||||
# Test for targets that do not exist under the targets directory.
|
||||
self.targets_object.revoke(rolename)
|
||||
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
|
||||
public_keys, ['non-existent.txt'], threshold,
|
||||
backtrack=True, restricted_paths=restricted_paths,
|
||||
path_hash_prefixes=path_hash_prefixes)
|
||||
|
||||
# Test for targets that do not exist under the targets directory.
|
||||
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
|
||||
public_keys, list_of_targets, threshold,
|
||||
backtrack=True, restricted_paths=['non-existent.txt'],
|
||||
path_hash_prefixes=path_hash_prefixes)
|
||||
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
|
|
|
|||
|
|
@ -257,16 +257,18 @@ def test_with_tuf_mode_2(self):
|
|||
# by sending just several characters every few seconds.
|
||||
|
||||
server_process = self._start_slow_server('mode_2')
|
||||
|
||||
client_filepath = os.path.join(self.client_directory, 'file1.txt')
|
||||
original_average_download_speed = tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED
|
||||
tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED = 1
|
||||
|
||||
try:
|
||||
file1_target = self.repository_updater.target('file1.txt')
|
||||
self.repository_updater.download_target(file1_target, self.client_directory)
|
||||
|
||||
# Verify that the specific 'tuf.SlowRetrievalError' exception is raised by
|
||||
# each mirror. 'file1.txt' should be large enough to trigger a slow
|
||||
# retrieval attack, otherwise the expected exception may not be consistently
|
||||
# raised.
|
||||
# retrieval attack, otherwise the expected exception may not be
|
||||
# consistently raised.
|
||||
except tuf.NoWorkingMirrorError as exception:
|
||||
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
|
||||
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
|
||||
|
|
@ -283,6 +285,7 @@ def test_with_tuf_mode_2(self):
|
|||
|
||||
finally:
|
||||
self._stop_slow_server(server_process)
|
||||
tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED = original_average_download_speed
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -443,8 +443,6 @@ def test_2__import_delegations(self):
|
|||
self.repository_updater.metadata['current']['targets']\
|
||||
['delegations']['keys'][existing_keyid]['keyid'] = '123'
|
||||
|
||||
print(repr(self.repository_updater.metadata['current']['targets']\
|
||||
['delegations']['keys'][existing_keyid]['keyid']))
|
||||
self.repository_updater._import_delegations('targets')
|
||||
#self.assertRaises(tuf.Error, self.repository_updater._import_delegations,
|
||||
# 'targets')
|
||||
|
|
@ -457,12 +455,9 @@ def test_2__import_delegations(self):
|
|||
# one of the roles loaded from parent role's 'delegations'.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def test_2__fileinfo_has_changed(self):
|
||||
# Verify that the method returns 'False' if file info was not changed.
|
||||
root_filepath = os.path.join(self.client_metadata_current, 'root.json')
|
||||
|
|
@ -875,6 +870,39 @@ def test_5_targets_of_role(self):
|
|||
|
||||
|
||||
|
||||
def test_6_refresh_tagets_metadata_chain(self):
|
||||
# NOTE: This function does not refresh the role specified in the argument,
|
||||
# only its parent roles.
|
||||
|
||||
# Remove the metadata of the delegated roles.
|
||||
os.remove(os.path.join(self.client_metadata_current, 'targets.json'))
|
||||
os.remove(os.path.join(self.client_metadata_current, 'targets', 'role1.json'))
|
||||
|
||||
# Test: normal case.
|
||||
metadata_list = \
|
||||
self.repository_updater.refresh_targets_metadata_chain('targets')
|
||||
|
||||
"""
|
||||
print(repr(metadata_list))
|
||||
self.assertEqual(len(metadata_list), 0)
|
||||
self.assertTrue('targets' in metadata_list)
|
||||
|
||||
# Verify that the expected role files were downloaded and installed.
|
||||
os.path.exists(os.path.join(self.client_metadata_current, 'targets.json'))
|
||||
|
||||
self.assertTrue('targets' in self.repository_updater.metadata['current'])
|
||||
self.assertFalse('targets/role1' in self.repository_updater.metadata['current'])
|
||||
"""
|
||||
# Test: Invalid arguments.
|
||||
# refresh_targets_metadata_chain() expects a string rolename.
|
||||
self.assertRaises(tuf.FormatError,
|
||||
self.repository_updater.refresh_targets_metadata_chain,
|
||||
8)
|
||||
self.assertRaises(tuf.RepositoryError,
|
||||
self.repository_updater.refresh_targets_metadata_chain,
|
||||
'unknown_rolename')
|
||||
|
||||
|
||||
|
||||
def test_6_target(self):
|
||||
# Setup
|
||||
|
|
@ -983,6 +1011,7 @@ def test_6_download_target(self):
|
|||
target_fileinfo = self.repository_updater.target(target_filepath)
|
||||
self.repository_updater.download_target(target_fileinfo,
|
||||
destination_directory)
|
||||
|
||||
download_filepath = \
|
||||
os.path.join(destination_directory, target_filepath.lstrip('/'))
|
||||
self.assertTrue(os.path.exists(download_filepath))
|
||||
|
|
@ -994,6 +1023,11 @@ def test_6_download_target(self):
|
|||
if 'custom' in target_fileinfo['fileinfo']:
|
||||
download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom']
|
||||
self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo)
|
||||
|
||||
# Test when consistent snapshots is set.
|
||||
self.repository_updater.consistent_snapshots = True
|
||||
self.repository_updater.download_target(target_fileinfo,
|
||||
destination_directory)
|
||||
|
||||
# Test: Invalid arguments.
|
||||
self.assertRaises(tuf.FormatError, self.repository_updater.download_target,
|
||||
|
|
@ -1183,11 +1217,50 @@ def test_9__get_target_hash(self):
|
|||
self.assertEqual(self.repository_updater._get_target_hash(filepath), target_hash)
|
||||
|
||||
# Test for improperly formatted argument.
|
||||
self.assertRaises(tuf.FormatError, tuf.util.get_target_hash, 8)
|
||||
#self.assertRaises(tuf.FormatError, self.repository_updater._get_target_hash, 8)
|
||||
|
||||
|
||||
|
||||
|
||||
def test_10__hard_check_file_length(self):
|
||||
# Test for exception if file object is not equal to trusted file length.
|
||||
temp_file_object = tuf.util.TempFile()
|
||||
temp_file_object.write(b'X')
|
||||
temp_file_object.seek(0)
|
||||
self.assertRaises(tuf.DownloadLengthMismatchError,
|
||||
self.repository_updater._hard_check_file_length,
|
||||
temp_file_object, 10)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def test_10__soft_check_file_length(self):
|
||||
# Test for exception if file object is not equal to trusted file length.
|
||||
temp_file_object = tuf.util.TempFile()
|
||||
temp_file_object.write(b'XXX')
|
||||
temp_file_object.seek(0)
|
||||
self.assertRaises(tuf.DownloadLengthMismatchError,
|
||||
self.repository_updater._soft_check_file_length,
|
||||
temp_file_object, 1)
|
||||
|
||||
|
||||
|
||||
def test_10__targets_of_role(self):
|
||||
# Test for non-existent role.
|
||||
self.assertRaises(tuf.UnknownRoleError,
|
||||
self.repository_updater._targets_of_role,
|
||||
'non-existent-role')
|
||||
|
||||
# Test for role that hasn't been loaded yet.
|
||||
del self.repository_updater.metadata['current']['targets']
|
||||
self.assertEqual(len(self.repository_updater._targets_of_role('targets')),
|
||||
0)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _load_role_keys(keystore_directory):
|
||||
|
||||
|
|
|
|||
|
|
@ -219,26 +219,32 @@ def test_A6_tempfile_decompress_temp_file_object(self):
|
|||
for arg in bogus_args:
|
||||
self.assertRaises(tuf.Error,
|
||||
self.temp_fileobj.decompress_temp_file_object, arg)
|
||||
|
||||
# Test for a valid util.decompress_temp_file_object() call.
|
||||
self.temp_fileobj.decompress_temp_file_object('gzip')
|
||||
self.assertEqual(self.temp_fileobj.read(), fileobj.read())
|
||||
|
||||
# Checking the content of the TempFile's '_orig_file' instance.
|
||||
check_compressed_original = self.make_temp_file()
|
||||
with open(check_compressed_original, 'wb') as file_object:
|
||||
file_object.write(self.temp_fileobj._orig_file.read())
|
||||
self.temp_fileobj._orig_file.seek(0)
|
||||
original_content = self.temp_fileobj._orig_file.read()
|
||||
file_object.write(original_content)
|
||||
|
||||
data_in_orig_file = self._decompress_file(check_compressed_original)
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(data_in_orig_file, fileobj.read())
|
||||
|
||||
|
||||
# Try decompressing once more.
|
||||
self.assertRaises(tuf.Error,
|
||||
self.temp_fileobj.decompress_temp_file_object, 'gzip')
|
||||
|
||||
# Test decompression of invalid gzip file.
|
||||
temp_file = tuf.util.TempFile()
|
||||
fileobj.seek(0)
|
||||
temp_file.write(fileobj.read())
|
||||
temp_file.decompress_temp_file_object('gzip')
|
||||
temp_file.write(b'bad zip')
|
||||
contents = temp_file.read()
|
||||
self.assertRaises(tuf.DecompressionError,
|
||||
temp_file.decompress_temp_file_object, 'gzip')
|
||||
|
||||
|
||||
|
||||
|
|
@ -314,6 +320,12 @@ def test_B3_file_in_confined_directories(self):
|
|||
|
||||
def test_B4_import_json(self):
|
||||
self.assertTrue('json' in sys.modules)
|
||||
json_module = tuf.util.import_json()
|
||||
self.assertTrue(json_module is not None)
|
||||
|
||||
# Test import_json() when 'util._json_moduel' is non-None.
|
||||
tuf.util._json_module = 'junk_module'
|
||||
self.assertEqual(tuf.util.import_json(), 'junk_module')
|
||||
|
||||
|
||||
|
||||
|
|
@ -424,7 +436,7 @@ def test_C3_paths_are_consistent_with_hash_prefixes(self):
|
|||
path_hash_prefixes = ['e3a3', '8fae', 'd543']
|
||||
list_of_targets = ['/file1.txt', '/README.txt', '/warehouse/file2.txt']
|
||||
|
||||
# Ensure the paths of 'list_of_targets' each have the epected path hash
|
||||
# Ensure the paths of 'list_of_targets' each have the expected path hash
|
||||
# prefix listed in 'path_hash_prefixes'.
|
||||
for filepath in list_of_targets:
|
||||
self.assertTrue(tuf.util.get_target_hash(filepath)[0:4] in path_hash_prefixes)
|
||||
|
|
|
|||
|
|
@ -505,20 +505,16 @@ def _check_content_length(reported_length, required_length, strict_length=True):
|
|||
|
||||
logger.debug('The server reported a length of '+repr(reported_length)+' bytes.')
|
||||
comparison_result = None
|
||||
|
||||
if reported_length < required_length:
|
||||
comparison_result = 'less than'
|
||||
|
||||
try:
|
||||
if reported_length < required_length:
|
||||
comparison_result = 'less than'
|
||||
|
||||
elif reported_length > required_length:
|
||||
comparison_result = 'greater than'
|
||||
|
||||
else:
|
||||
comparison_result = 'equal to'
|
||||
|
||||
except:
|
||||
logger.exception('Could not check reported and required lengths.')
|
||||
elif reported_length > required_length:
|
||||
comparison_result = 'greater than'
|
||||
|
||||
else:
|
||||
comparison_result = 'equal to'
|
||||
|
||||
if strict_length:
|
||||
message = 'The reported length is '+comparison_result+' the required '+\
|
||||
'length of '+repr(required_length)+' bytes.'
|
||||
|
|
|
|||
10
tuf/keydb.py
10
tuf/keydb.py
|
|
@ -90,9 +90,9 @@ def create_keydb_from_root_metadata(root_metadata):
|
|||
# Clear the key database.
|
||||
_keydb_dict.clear()
|
||||
|
||||
# Iterate through the keys found in 'root_metadata' by converting
|
||||
# them to 'RSAKEY_SCHEMA' if their type is 'rsa', and then
|
||||
# adding them the database. Duplicates are avoided.
|
||||
# Iterate the keys found in 'root_metadata' by converting them to
|
||||
# 'RSAKEY_SCHEMA' if their type is 'rsa', and then adding them to the
|
||||
# database.
|
||||
for keyid, key_metadata in six.iteritems(root_metadata['keys']):
|
||||
if key_metadata['keytype'] in _SUPPORTED_KEY_TYPES:
|
||||
# 'key_metadata' is stored in 'KEY_SCHEMA' format. Call
|
||||
|
|
@ -102,7 +102,9 @@ def create_keydb_from_root_metadata(root_metadata):
|
|||
try:
|
||||
add_key(key_dict, keyid)
|
||||
|
||||
except tuf.KeyAlreadyExistsError as e:
|
||||
# Although keyid duplicates should *not* occur (unique dict keys), log a
|
||||
# warning and continue.
|
||||
except tuf.KeyAlreadyExistsError as e: # pragma: no cover
|
||||
logger.warning(e)
|
||||
continue
|
||||
|
||||
|
|
|
|||
|
|
@ -844,6 +844,7 @@ def verify_signature(key_dict, signature, data):
|
|||
valid_signature = tuf.ed25519_keys.verify_signature(public,
|
||||
method, sig, data,
|
||||
use_pynacl=True)
|
||||
|
||||
# Fall back to the optimized pure python implementation of ed25519.
|
||||
else: # pragma: no cover
|
||||
valid_signature = tuf.ed25519_keys.verify_signature(public,
|
||||
|
|
|
|||
|
|
@ -304,13 +304,13 @@ def create_rsa_signature(private_key, data):
|
|||
pkcs1_pss_signer = Crypto.Signature.PKCS1_PSS.new(rsa_key_object)
|
||||
signature = pkcs1_pss_signer.sign(sha256_object)
|
||||
|
||||
except ValueError:
|
||||
except ValueError: #pragma: no cover
|
||||
raise tuf.CryptoError('The RSA key too small for given hash algorithm.')
|
||||
|
||||
except TypeError:
|
||||
raise tuf.CryptoError('Missing required RSA private key.')
|
||||
|
||||
except IndexError:
|
||||
except IndexError: # pragma: no cover
|
||||
message = 'An RSA signature cannot be generated: ' + str(e)
|
||||
raise tuf.CryptoError(message)
|
||||
|
||||
|
|
@ -585,8 +585,9 @@ def create_rsa_public_and_private_from_encrypted_pem(encrypted_pem, passphrase):
|
|||
public = rsa_pubkey.exportKey(format='PEM')
|
||||
|
||||
# PyCrypto raises 'ValueError' if the public or private keys cannot be
|
||||
# exported. See 'Crypto.PublicKey.RSA'.
|
||||
except (ValueError):
|
||||
# exported. See 'Crypto.PublicKey.RSA'. 'ValueError' should not be raised
|
||||
# if the 'Crypto.PublicKey.RSA.importKey() call above passed.
|
||||
except (ValueError): #pragma: no cover
|
||||
message = 'The public and private keys cannot be exported in PEM format.'
|
||||
raise tuf.CryptoError(message)
|
||||
|
||||
|
|
|
|||
|
|
@ -162,6 +162,8 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
|
|||
|
||||
_log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'],
|
||||
TIMESTAMP_EXPIRES_WARN_SECONDS)
|
||||
else:
|
||||
raise tuf.Error('Invalid rolename')
|
||||
|
||||
signable = sign_metadata(metadata, roleinfo['signing_keyids'],
|
||||
metadata_filename)
|
||||
|
|
@ -387,7 +389,7 @@ def _remove_invalid_and_duplicate_signatures(signable):
|
|||
|
||||
# Although valid, it may still need removal if it is a duplicate. Check
|
||||
# the keyid, rather than the signature, to remove duplicate PSS signatures.
|
||||
# PSS may generate multiple different signatures for the same keyid.
|
||||
# PSS may generate multiple different signatures for the same keyid.
|
||||
else:
|
||||
if keyid in signature_keyids:
|
||||
signable['signatures'].remove(signature)
|
||||
|
|
@ -1793,11 +1795,14 @@ def sign_metadata(metadata_object, keyids, filename):
|
|||
for signature in signable['signatures']:
|
||||
if not keyid == signature['keyid']:
|
||||
signatures.append(signature)
|
||||
|
||||
else:
|
||||
continue
|
||||
signable['signatures'] = signatures
|
||||
|
||||
# Generate the signature using the appropriate signing method.
|
||||
if key['keytype'] in SUPPORTED_KEY_TYPES:
|
||||
if len(key['keyval']['private']):
|
||||
if 'private' in key['keyval']:
|
||||
signed = signable['signed']
|
||||
signature = tuf.keys.create_signature(key, signed)
|
||||
signable['signatures'].append(signature)
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@
|
|||
try:
|
||||
tuf.keys.check_crypto_libraries(['rsa', 'ed25519', 'general'])
|
||||
|
||||
except tuf.UnsupportedLibraryError as e:
|
||||
except tuf.UnsupportedLibraryError as e: #pragma: no cover
|
||||
message = 'Warning: The repository and developer tools require additional' + \
|
||||
' libraries and can be installed as follows:\n $ pip install tuf[tools]'
|
||||
logger.warn(message)
|
||||
|
|
@ -1795,7 +1795,7 @@ def add_targets(self, list_of_targets):
|
|||
# times these checks are performed.
|
||||
for target in list_of_targets:
|
||||
filepath = os.path.abspath(target)
|
||||
|
||||
|
||||
if not filepath.startswith(self._targets_directory+os.sep):
|
||||
message = repr(filepath) + ' is not under the Repository\'s targets ' +\
|
||||
'directory: ' + repr(self._targets_directory)
|
||||
|
|
@ -1813,6 +1813,9 @@ def add_targets(self, list_of_targets):
|
|||
for relative_target in relative_list_of_targets:
|
||||
if relative_target not in roleinfo['paths']:
|
||||
roleinfo['paths'].update({relative_target: {}})
|
||||
|
||||
else:
|
||||
continue
|
||||
|
||||
tuf.roledb.update_roleinfo(self.rolename, roleinfo)
|
||||
|
||||
|
|
|
|||
57
tuf/util.py
57
tuf/util.py
|
|
@ -317,8 +317,12 @@ def decompress_temp_file_object(self, compression):
|
|||
self._orig_file = self.temporary_file
|
||||
|
||||
try:
|
||||
self.temporary_file = gzip.GzipFile(fileobj=self.temporary_file,
|
||||
mode='rb')
|
||||
gzip_file_object = gzip.GzipFile(fileobj=self.temporary_file, mode='rb')
|
||||
uncompressed_content = gzip_file_object.read()
|
||||
self.temporary_file = tempfile.NamedTemporaryFile()
|
||||
self.temporary_file.write(uncompressed_content)
|
||||
self.flush()
|
||||
|
||||
except Exception as exception:
|
||||
raise tuf.DecompressionError(exception)
|
||||
|
||||
|
|
@ -661,12 +665,14 @@ def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations):
|
|||
|
||||
if allowed_child_path_hash_prefixes is not None:
|
||||
consistent = paths_are_consistent_with_hash_prefixes
|
||||
if len(actual_child_targets) > 0:
|
||||
if not consistent(actual_child_targets,
|
||||
allowed_child_path_hash_prefixes):
|
||||
message = repr(rolename) + ' specifies a target that does not' + \
|
||||
' have a path hash prefix listed in its parent role.'
|
||||
raise tuf.ForbiddenTargetError(message)
|
||||
|
||||
# 'actual_child_tarets' (i.e., 'list_of_targets') should have lenth
|
||||
# greater than zero due to the tuf.format check above.
|
||||
if not consistent(actual_child_targets,
|
||||
allowed_child_path_hash_prefixes):
|
||||
message = repr(rolename) + ' specifies a target that does not' + \
|
||||
' have a path hash prefix listed in its parent role.'
|
||||
raise tuf.ForbiddenTargetError(message)
|
||||
|
||||
elif allowed_child_paths is not None:
|
||||
# Check that each delegated target is either explicitly listed or a parent
|
||||
|
|
@ -710,7 +716,7 @@ def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations):
|
|||
def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes):
|
||||
"""
|
||||
<Purpose>
|
||||
Determine whether a list of paths are consistent with theirs alleged
|
||||
Determine whether a list of paths are consistent with their alleged
|
||||
path hash prefixes. By default, the SHA256 hash function is used.
|
||||
|
||||
<Arguments>
|
||||
|
|
@ -743,21 +749,23 @@ def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes):
|
|||
# proven otherwise.
|
||||
consistent = False
|
||||
|
||||
if len(paths) > 0 and len(path_hash_prefixes) > 0:
|
||||
for path in paths:
|
||||
path_hash = get_target_hash(path)
|
||||
# Assume that every path is inconsistent until proven otherwise.
|
||||
consistent = False
|
||||
# The format checks above ensure the 'paths' and 'path_hash_prefix' lists
|
||||
# have lengths greater than zero.
|
||||
for path in paths:
|
||||
path_hash = get_target_hash(path)
|
||||
|
||||
# Assume that every path is inconsistent until proven otherwise.
|
||||
consistent = False
|
||||
|
||||
for path_hash_prefix in path_hash_prefixes:
|
||||
if path_hash.startswith(path_hash_prefix):
|
||||
consistent = True
|
||||
break
|
||||
|
||||
# This path has no matching path_hash_prefix. Stop looking further.
|
||||
if not consistent:
|
||||
for path_hash_prefix in path_hash_prefixes:
|
||||
if path_hash.startswith(path_hash_prefix):
|
||||
consistent = True
|
||||
break
|
||||
|
||||
# This path has no matching path_hash_prefix. Stop looking further.
|
||||
if not consistent:
|
||||
break
|
||||
|
||||
return consistent
|
||||
|
||||
|
||||
|
|
@ -836,11 +844,16 @@ def import_json():
|
|||
|
||||
if _json_module is not None:
|
||||
return _json_module
|
||||
|
||||
else:
|
||||
try:
|
||||
module = __import__('json')
|
||||
except ImportError:
|
||||
|
||||
# The 'json' module is available in Python > 2.6, and thus this exception
|
||||
# should not occur in all supported Python installations (> 2.6) of TUF.
|
||||
except ImportError: #pragma: no cover
|
||||
raise ImportError('Could not import the json module')
|
||||
|
||||
else:
|
||||
_json_module = module
|
||||
return module
|
||||
|
|
|
|||
Loading…
Reference in a new issue