Merge pull request #370 from vladimir-v-diaz/develop

Code coverage
This commit is contained in:
Vladimir Diaz 2016-09-01 10:25:16 -04:00 committed by GitHub
commit fbc1265170
18 changed files with 878 additions and 296 deletions

View file

@ -15,6 +15,7 @@ include tuf/_vendor/ed25519/LICENSE
recursive-include docs *.txt
recursive-include docs/papers *.pdf
recursive-include docs/images *.png
recursive-include tuf/scripts *.py
recursive-include examples *
recursive-include tests *.py
recursive-include tests *.pem

View file

@ -112,6 +112,7 @@
packages = find_packages(exclude=['tests']),
extras_require = extras,
scripts = [
'tuf/client/basic_client.py'
'tuf/scripts/basic_client.py',
'tuf/scripts/tuf.py'
]
)

View file

@ -452,8 +452,10 @@ def test_open(self):
self.assertRaises(AttributeError, myUpdater.open, 8)
url = 'http://localhost:8001/targets/file1.txt'
myUpdater.open(url, 'interposition.json')
url = 'http://localhost:8001/targets/file1.txt'
interposition_file = \
os.path.join(self.temporary_directory, 'interposition.json')
myUpdater.open(url, interposition_file)
def test_retrieve(self):
@ -462,7 +464,9 @@ def test_retrieve(self):
self.assertRaises(AttributeError, myUpdater.retrieve, 8)
test_source_url = 'http://localhost:8001/targets/file1.txt'
myUpdater.retrieve(test_source_url, 'interposition.json')
interposition_file = \
os.path.join(self.temporary_directory, 'interposition.json')
myUpdater.retrieve(test_source_url, interposition_file)
#self.assertRaises(tuf.NoWorkingMirrorError, myUpdater.retrieve, test_source_url)

View file

@ -51,25 +51,31 @@ def setUpClass(cls):
def test_generate_rsa_key(self):
_rsakey_dict = KEYS.generate_rsa_key()
default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY
for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']:
KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library
_rsakey_dict = KEYS.generate_rsa_key()
# Check if the format of the object returned by generate() corresponds
# to RSAKEY_SCHEMA format.
self.assertEqual(None, tuf.formats.RSAKEY_SCHEMA.check_match(_rsakey_dict),
FORMAT_ERROR_MSG)
# Check if the format of the object returned by generate() corresponds
# to RSAKEY_SCHEMA format.
self.assertEqual(None, tuf.formats.RSAKEY_SCHEMA.check_match(_rsakey_dict),
FORMAT_ERROR_MSG)
# Passing a bit value that is <2048 to generate() - should raise
# 'tuf.FormatError'.
self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 555)
# Passing a bit value that is <2048 to generate() - should raise
# 'tuf.FormatError'.
self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 555)
# Passing a string instead of integer for a bit value.
self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 'bits')
# Passing a string instead of integer for a bit value.
self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 'bits')
# NOTE if random bit value >=2048 (not 4096) is passed generate(bits)
# does not raise any errors and returns a valid key.
self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(2048)))
self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096)))
# NOTE if random bit value >=2048 (not 4096) is passed generate(bits)
# does not raise any errors and returns a valid key.
self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(2048)))
self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096)))
# Reset to originally set RSA crypto library.
KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library
def test_format_keyval_to_metadata(self):
@ -176,122 +182,149 @@ def test_helper_get_keyid(self):
def test_create_signature(self):
# Creating a signature for 'DATA'.
rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA)
ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA)
default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY
for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']:
KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library
# Check format of output.
self.assertEqual(None,
tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature),
FORMAT_ERROR_MSG)
self.assertEqual(None,
tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature),
FORMAT_ERROR_MSG)
# Creating a signature for 'DATA'.
rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA)
ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA)
# Check format of output.
self.assertEqual(None,
tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature),
FORMAT_ERROR_MSG)
self.assertEqual(None,
tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature),
FORMAT_ERROR_MSG)
# Removing private key from 'rsakey_dict' - should raise a TypeError.
private = self.rsakey_dict['keyval']['private']
self.rsakey_dict['keyval']['private'] = ''
args = (self.rsakey_dict, DATA)
self.assertRaises(TypeError, KEYS.create_signature, *args)
# Removing private key from 'rsakey_dict' - should raise a TypeError.
private = self.rsakey_dict['keyval']['private']
self.rsakey_dict['keyval']['private'] = ''
args = (self.rsakey_dict, DATA)
self.assertRaises(ValueError, KEYS.create_signature, *args)
# Supplying an incorrect number of arguments.
self.assertRaises(TypeError, KEYS.create_signature)
self.rsakey_dict['keyval']['private'] = private
# Supplying an incorrect number of arguments.
self.assertRaises(TypeError, KEYS.create_signature)
self.rsakey_dict['keyval']['private'] = private
KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library
def test_verify_signature(self):
# Creating a signature of 'DATA' to be verified.
rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA)
ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA)
# Verifying the 'signature' of 'DATA'.
verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY
default_available_libraries = KEYS._available_crypto_libraries
for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']:
KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library
# Verifying the 'ed25519_signature' of 'DATA'.
verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Creating a signature of 'DATA' to be verified.
rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA)
ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA)
# Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with
# 'DATA' different than the original 'DATA' that was used
# in creating the 'rsa_signature'. Function should return 'False'.
# Verifying the 'signature' of 'DATA'.
verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Verifying the 'ed25519_signature' of 'DATA'.
verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with
# 'DATA' different than the original 'DATA' that was used
# in creating the 'rsa_signature'. Function should return 'False'.
# Modifying 'DATA'.
_DATA = '1111' + DATA + '1111'
# Modifying 'DATA'.
_DATA = '1111' + DATA + '1111'
# Verifying the 'signature' of modified '_DATA'.
verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA)
self.assertFalse(verified,
'Returned \'True\' on an incorrect signature.')
# Verifying the 'signature' of modified '_DATA'.
verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA)
self.assertFalse(verified,
'Returned \'True\' on an incorrect signature.')
# Modifying 'signature' to pass an incorrect method since only
# 'PyCrypto-PKCS#1 PSS' is accepted.
rsa_signature['method'] = 'Biff'
# Modifying 'signature' to pass an incorrect method since only
# 'PyCrypto-PKCS#1 PSS' is accepted.
rsa_signature['method'] = 'Biff'
args = (self.rsakey_dict, rsa_signature, DATA)
self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args)
args = (self.rsakey_dict, rsa_signature, DATA)
self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args)
# Passing incorrect number of arguments.
self.assertRaises(TypeError, KEYS.verify_signature)
# Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is
# unavailable) is executed in tuf.keys.verify_signature().
KEYS._ED25519_CRYPTO_LIBRARY = 'invalid'
KEYS._available_crypto_libraries = ['invalid']
verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Passing incorrect number of arguments.
self.assertRaises(TypeError, KEYS.verify_signature)
# Reset to the expected available crypto libraries.
KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl'
KEYS._available_crypto_libraries = ['ed25519', 'pycrypto', 'pynacl']
# Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is
# unavailable) is executed in tuf.keys.verify_signature().
KEYS._ED25519_CRYPTO_LIBRARY = 'invalid'
KEYS._available_crypto_libraries = ['invalid']
verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Reset to the expected available crypto libraries.
KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl'
KEYS._available_crypto_libraries = default_available_libraries
KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library
def test_create_rsa_encrypted_pem(self):
# Test valid arguments.
private = self.rsakey_dict['keyval']['private']
passphrase = 'secret'
encrypted_pem = KEYS.create_rsa_encrypted_pem(private, passphrase)
self.assertTrue(tuf.formats.PEMRSA_SCHEMA.matches(encrypted_pem))
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem,
8, passphrase)
default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY
for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']:
KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library
self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem,
private, 8)
# Test valid arguments.
private = self.rsakey_dict['keyval']['private']
passphrase = 'secret'
encrypted_pem = KEYS.create_rsa_encrypted_pem(private, passphrase)
self.assertTrue(tuf.formats.PEMRSA_SCHEMA.matches(encrypted_pem))
# Test for missing required library.
KEYS._RSA_CRYPTO_LIBRARY = 'invalid'
self.assertRaises(tuf.UnsupportedLibraryError, KEYS.create_rsa_encrypted_pem,
private, passphrase)
KEYS._RSA_CRYPTO_LIBRARY = 'pycrypto'
# Try to import the encryped PEM file.
rsakey = KEYS.import_rsakey_from_encrypted_pem(encrypted_pem, passphrase)
self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(rsakey))
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem,
8, passphrase)
self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem,
private, 8)
# Test for missing required library.
KEYS._RSA_CRYPTO_LIBRARY = 'invalid'
self.assertRaises(tuf.UnsupportedLibraryError, KEYS.create_rsa_encrypted_pem,
private, passphrase)
KEYS._RSA_CRYPTO_LIBRARY = 'pycrypto'
KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library
def test_decrypt_key(self):
# Test valid arguments.
passphrase = 'secret'
encrypted_key = KEYS.encrypt_key(self.rsakey_dict, passphrase).encode('utf-8')
decrypted_key = KEYS.decrypt_key(encrypted_key, passphrase)
self.assertTrue(tuf.formats.ANYKEY_SCHEMA.matches(decrypted_key))
default_general_library = KEYS._GENERAL_CRYPTO_LIBRARY
for general_crypto_library in ['pycrypto', 'pyca-cryptography']:
KEYS._GENERAL_CRYPTO_LIBRARY = general_crypto_library
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, KEYS.decrypt_key,
8, passphrase)
self.assertRaises(tuf.FormatError, KEYS.decrypt_key,
encrypted_key, 8)
# Test valid arguments.
passphrase = 'secret'
encrypted_key = KEYS.encrypt_key(self.rsakey_dict, passphrase).encode('utf-8')
decrypted_key = KEYS.decrypt_key(encrypted_key, passphrase)
# Test for missing required library.
KEYS._GENERAL_CRYPTO_LIBRARY = 'invalid'
self.assertRaises(tuf.UnsupportedLibraryError, KEYS.decrypt_key,
encrypted_key, passphrase)
KEYS._GENERAL_CRYPTO_LIBRARY = 'pycrypto'
self.assertTrue(tuf.formats.ANYKEY_SCHEMA.matches(decrypted_key))
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, KEYS.decrypt_key,
8, passphrase)
self.assertRaises(tuf.FormatError, KEYS.decrypt_key,
encrypted_key, 8)
# Test for missing required library.
KEYS._GENERAL_CRYPTO_LIBRARY = 'invalid'
self.assertRaises(tuf.UnsupportedLibraryError, KEYS.decrypt_key,
encrypted_key, passphrase)
KEYS._GENERAL_CRYPTO_LIBRARY = 'pycrypto'
KEYS._GENERAL_CRYPTO_LIBRARY = default_general_library
# Run the unit tests.

View file

@ -137,6 +137,21 @@ def test_verify_rsa_signature(self):
self.assertEqual(False, crypto_keys.verify_rsa_signature(mismatched_signature,
method, public_rsa, data))
def test__decrypt(self):
# Verify that invalid encrypted file is detected.
self.assertRaises(tuf.CryptoError, crypto_keys._decrypt,
'bad encrypted file', 'password')
def test_encrypt_key(self):
# Verify that a key argument with a missing private key is rejected.
global public_rsa
self.assertRaises(tuf.FormatError, crypto_keys.encrypt_key,
public_rsa, 'password')
# Run the unit tests.
if __name__ == '__main__':

View file

@ -81,7 +81,7 @@ def test_create_rsa_signature(self):
self.assertRaises(tuf.FormatError,
pycrypto.create_rsa_signature, 123, data)
self.assertRaises(TypeError,
self.assertRaises(ValueError,
pycrypto.create_rsa_signature, '', data)
# Check for invalid 'data'.

View file

@ -436,6 +436,19 @@ def test_generate_root_metadata(self):
consistent_snapshot=False)
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata))
root_keyids = tuf.roledb.get_role_keyids('root')
tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'bad_keytype'
self.assertRaises(tuf.Error, repo_lib.generate_root_metadata, 1,
expires, consistent_snapshot=False)
# Reset the root key's keytype, so that we can next verify that a different
# tuf.Error exception is raised for duplicate keyids.
tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'rsa'
# Add duplicate keyid to root's roleinfo.
tuf.roledb._roledb_dict['default']['root']['keyids'].append(root_keyids[0])
self.assertRaises(tuf.Error, repo_lib.generate_root_metadata, 1,
expires, consistent_snapshot=False)
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, repo_lib.generate_root_metadata,
@ -519,6 +532,12 @@ def test_generate_targets_metadata(self):
self.assertTrue(len(list_targets_directory) + 1,
len(new_list_targets_directory))
# Verify that an exception is not raised if the target files already exist.
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, delegations,
write_consistent_targets=True)
# Verify that 'targets_metadata' contains a 'custom' entry (optional)
# for 'file.txt'.
self.assertTrue('custom' in targets_metadata['targets']['file.txt'])
@ -658,20 +677,22 @@ def test_sign_metadata(self):
'keystore')
root_filename = os.path.join(metadata_path, 'root.json')
root_metadata = tuf.util.load_json_file(root_filename)['signed']
targets_filename = os.path.join(metadata_path, 'targets.json')
targets_metadata = tuf.util.load_json_file(targets_filename)['signed']
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
tuf.roledb.create_roledb_from_root_metadata(root_metadata)
root_keyids = tuf.roledb.get_role_keyids('root')
targets_keyids = tuf.roledb.get_role_keyids('targets')
root_private_keypath = os.path.join(keystore_path, 'root_key')
root_private_key = \
repo_lib.import_rsa_privatekey_from_file(root_private_keypath, 'password')
# Sign with a valid, but not a threshold, key.
targets_private_keypath = os.path.join(keystore_path, 'targets_key')
targets_private_key = \
repo_lib.import_ed25519_privatekey_from_file(targets_private_keypath,
'password')
targets_public_keypath = os.path.join(keystore_path, 'targets_key.pub')
targets_public_key = \
repo_lib.import_ed25519_publickey_from_file(targets_public_keypath)
# sign_metadata() expects the private key 'root_metadata' to be in
# 'tuf.keydb'. Remove any public keys that may be loaded before
@ -679,17 +700,23 @@ def test_sign_metadata(self):
# raised.
tuf.keydb.remove_key(root_private_key['keyid'])
tuf.keydb.add_key(root_private_key)
tuf.keydb.remove_key(targets_private_key['keyid'])
tuf.keydb.add_key(targets_private_key)
tuf.keydb.remove_key(targets_public_key['keyid'])
tuf.keydb.add_key(targets_public_key)
root_keyids.extend(tuf.roledb.get_role_keyids('targets'))
# Add the snapshot's public key (to test whether non-private keys are
# ignored by sign_metadata()).
root_keyids.extend(tuf.roledb.get_role_keyids('snapshot'))
# Verify that a valid root signable is generated.
root_signable = repo_lib.sign_metadata(root_metadata, root_keyids,
root_filename)
self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable))
# Test for an unset private key (in this case, target's).
repo_lib.sign_metadata(targets_metadata, targets_keyids,
targets_filename)
# Add an invalid keytype to one of the root keys.
root_keyid = root_keyids[0]
tuf.keydb._keydb_dict['default'][root_keyid]['keytype'] = 'bad_keytype'
self.assertRaises(tuf.Error, repo_lib.sign_metadata, root_metadata,
root_keyids, root_filename)
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, repo_lib.sign_metadata, 3, root_keyids,
@ -720,7 +747,20 @@ def test_write_metadata_file(self):
consistent_snapshot=False)
self.assertTrue(os.path.exists(output_filename))
self.assertTrue(os.path.exists(output_filename + '.gz'))
# Attempt to over-write the previously written metadata file. An exception
# is not raised in this case, only a debug message is logged.
repo_lib.write_metadata_file(root_signable, output_filename,
version_number,
compression_algorithms,
consistent_snapshot=False)
# Test unknown compression algorithm.
self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename,
version_number,
compression_algorithms=['bad_algo'],
consistent_snapshot=False)
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file,
@ -735,8 +775,40 @@ def test_write_metadata_file(self):
self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename, version_number,
compression_algorithms, 3)
def test__write_compressed_metadata(self):
# Test for invalid 'compressed_filename' argument and set
# 'write_new_metadata' to False.
file_object = tuf.util.TempFile()
existing_filename = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
write_new_metadata = False
repo_lib._write_compressed_metadata(file_object,
compressed_filename=existing_filename,
write_new_metadata=write_new_metadata,
consistent_snapshot=False,
version_number=8)
# Test writing of compressed metadata when consistent snapshots is enabled.
file_object = tuf.util.TempFile()
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz'))
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip'))
shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip'))
compressed_filename = os.path.join(self.temporary_directory, 'root.json.gz')
# For testing purposes, add additional compression algorithms to
# repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS.
repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz', 'zip', 'bz2']
repo_lib._write_compressed_metadata(file_object,
compressed_filename=compressed_filename,
write_new_metadata=True,
consistent_snapshot=True,
version_number=8)
repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz']
def test_create_tuf_client_directory(self):
# Test normal case.
@ -811,7 +883,7 @@ def test__generate_and_write_metadata(self):
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_metadata = os.path.join('repository_data', 'repository', 'metadata',
'targets.json')
obsolete_metadata = os.path.join(metadata_directory, 'targets',
obsolete_metadata = os.path.join(metadata_directory,
'obsolete_role.json')
tuf.util.ensure_parent_dir(obsolete_metadata)
shutil.copyfile(targets_metadata, obsolete_metadata)
@ -825,18 +897,108 @@ def test__generate_and_write_metadata(self):
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
expiration = expiration.isoformat() + 'Z'
targets_roleinfo['expires'] = expiration
tuf.roledb.add_role('targets/obsolete_role', targets_roleinfo)
tuf.roledb.add_role('obsolete_role', targets_roleinfo)
repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata,
True,
targets_directory, metadata_directory,
consistent_snapshot=False,
filenames=None,
compression_algorithms=['gz'])
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
tuf.roledb.remove_role('targets/obsolete_role')
tuf.roledb.remove_role('obsolete_role')
self.assertTrue(os.path.exists(os.path.join(metadata_directory,
'targets/obsolete_role.json')))
'obsolete_role.json')))
tuf.repository_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'],
False)
self.assertFalse(os.path.exists(metadata_directory + 'targets/obsolete_role.json'))
self.assertFalse(os.path.exists(metadata_directory + 'obsolete_role.json'))
shutil.copyfile(targets_metadata, obsolete_metadata)
def test__delete_obsolete_metadata(self):
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
os.makedirs(metadata_directory)
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
# Create role metadata that should not exist in snapshot.json.
role1_filepath = os.path.join('repository_data', 'repository',
'metadata', 'role1.json')
shutil.copyfile(role1_filepath, os.path.join(metadata_directory, 'role2.json'))
repo_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'],
True)
# Verify what happens for a non-existent metadata directory (a debug message
# is logged).
repo_lib._delete_obsolete_metadata('non-existent',
snapshot_signable['signed'],
True)
def test__load_top_level_metadata(self):
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory,
repo_lib.TARGETS_DIRECTORY_NAME)
shutil.copytree(os.path.join('repository_data', 'repository', 'metadata'),
metadata_directory)
shutil.copytree(os.path.join('repository_data', 'repository', 'targets'),
targets_directory)
# Remove compressed metadata so that we can test for loading of a
# repository with no compression enabled.
for role_file in os.listdir(metadata_directory):
if role_file.endswith('.json.gz'):
role_filename = os.path.join(metadata_directory, role_file)
os.remove(role_filename)
filenames = repo_lib.get_metadata_filenames(metadata_directory)
repository = repo_tool.create_new_repository(repository_directory)
repo_lib._load_top_level_metadata(repository, filenames)
# We partially loaded 'role1' via the top-level Targets role. For the
# purposes of this test case (which only loads top-level metadata and no
# delegated metadata), remove this role to avoid issues with partially
# loaded information (e.g., missing 'version' info, signatures, etc.)
tuf.roledb.remove_role('role1')
# Partially write all top-level roles (we increase the threshold of each
# top-level role so that they are flagged as partially written.
repository.root.threshold = repository.root.threshold + 1
repository.snapshot.threshold = repository.snapshot.threshold + 1
repository.targets.threshold = repository.targets.threshold + 1
repository.timestamp.threshold = repository.timestamp.threshold + 1
repository.write(write_partial=True)
repo_lib._load_top_level_metadata(repository, filenames)
# Attempt to load a repository with missing top-level metadata.
for role_file in os.listdir(metadata_directory):
if role_file.endswith('.json') and not role_file.startswith('root'):
role_filename = os.path.join(metadata_directory, role_file)
os.remove(role_filename)
repo_lib._load_top_level_metadata(repository, filenames)
# Remove the required Root file and verify that an exception is raised.
os.remove(os.path.join(metadata_directory, 'root.json'))
self.assertRaises(tuf.RepositoryError, repo_lib._load_top_level_metadata,
repository, filenames)

View file

@ -137,7 +137,6 @@ def test_write_and_write_partial(self):
metadata_directory = os.path.join(repository_directory,
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
repository = repo_tool.create_new_repository(repository_directory)
# (1) Load the public and private keys of the top-level roles, and one
# delegated role.
@ -220,7 +219,6 @@ def test_write_and_write_partial(self):
# (6) Write repository.
repository.targets.compressions = ['gz']
repository.write()
# Verify that the expected metadata is written.
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
@ -229,6 +227,8 @@ def test_write_and_write_partial(self):
# Raise 'tuf.FormatError' if 'role_signable' is an invalid signable.
tuf.formats.check_signable_object_format(role_signable)
self.assertTrue(os.path.exists(role_filepath))
if role == 'targets.json':
compressed_filepath = role_filepath + '.gz'
@ -242,31 +242,41 @@ def test_write_and_write_partial(self):
# Verify that an exception is *not* raised for multiple repository.write().
repository.write()
# Verify the status() does not raise an exception.
# Verify that status() does not raise an exception.
repository.status()
# Verify status() does not raise 'tuf.InsufficientKeysError' if a top-level
# role does and 'role1' do not contain a threshold of keys.
root_roleinfo = tuf.roledb.get_roleinfo('root')
old_threshold = root_roleinfo['threshold']
root_roleinfo['threshold'] = 10
# Verify that status() does not raise 'tuf.InsufficientKeysError' if a
# top-level role does not contain a threshold of keys.
targets_roleinfo = tuf.roledb.get_roleinfo('targets')
old_threshold = targets_roleinfo['threshold']
targets_roleinfo['threshold'] = 10
tuf.roledb.update_roleinfo('targets', targets_roleinfo)
repository.status()
# Restore the original threshold values.
targets_roleinfo = tuf.roledb.get_roleinfo('targets')
targets_roleinfo['threshold'] = old_threshold
tuf.roledb.update_roleinfo('targets', targets_roleinfo)
# Verify that status() does not raise 'tuf.InsufficientKeysError' if a
# delegated role does not contain a threshold of keys.
role1_roleinfo = tuf.roledb.get_roleinfo('role1')
old_role1_threshold = role1_roleinfo['threshold']
role1_roleinfo['threshold'] = 10
tuf.roledb.update_roleinfo('root', root_roleinfo)
tuf.roledb.update_roleinfo('role1', role1_roleinfo)
repository.status()
# Restore the original threshold values.
root_roleinfo['threshold'] = old_threshold
tuf.roledb.update_roleinfo('root', root_roleinfo)
role1_roleinfo['threshold'] = old_role1_threshold
# Restore role1's threshold.
role1_roleinfo = tuf.roledb.get_roleinfo('role1')
role1_roleinfo['threshold'] = old_role1_threshold
tuf.roledb.update_roleinfo('role1', role1_roleinfo)
# Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the
# the top-level roles and 'role1' are improperly signed.
# the top-level roles. Test that 'root' is improperly signed.
repository.root.unload_signing_key(root_privkey)
repository.root.load_signing_key(targets_privkey)
repository.status()
repository.targets('role1').unload_signing_key(role1_privkey)
repository.targets('role1').load_signing_key(targets_privkey)
repository.status()
@ -324,8 +334,14 @@ def test_write_and_write_partial(self):
repository.root.load_signing_key(root_privkey)
repository.snapshot.load_signing_key(snapshot_privkey)
# Verify that a consistent snapshot can be written and loaded.
# Verify that a consistent snapshot can be written and loaded. The
# 'targets' and 'role1' roles must be be marked as dirty, otherwise
# write() will not create consistent snapshots for them.
repository.mark_dirty(['targets', 'role1'])
repository.write(consistent_snapshot=True)
# Verify that the newly written consistent snapshot can be loaded
# successfully.
repo_tool.load_repository(repository_directory)
# Test improperly formatted arguments.

View file

@ -677,7 +677,30 @@ def test_get_dirty_roles(self):
# Test for improperly formatted argument.
self.assertRaises(tuf.FormatError, tuf.roledb.get_dirty_roles, 123)
def test_mark_dirty(self):
# Add a dirty role to roledb.
rolename = 'targets'
roleinfo1 = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role(rolename, roleinfo1)
rolename2 = 'dirty_role'
roleinfo2 = {'keyids': ['123'], 'threshold': 2}
mark_role_as_dirty = True
tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty)
# Note: The 'default' repository is searched if the repository name is
# not given to get_dirty_roles().
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
tuf.roledb.mark_dirty(['dirty_role'])
self.assertEqual([rolename2, rolename], sorted(tuf.roledb.get_dirty_roles()))
# Verify that a role cannot be marked as dirty for a non-existent
# repository.
self.assertRaises(tuf.InvalidNameError, tuf.roledb.mark_dirty,
['dirty_role'], 'non-existent')
def _test_rolename(self, test_function):

View file

@ -287,11 +287,6 @@ def write(self, write_partial=False):
self._targets_directory, self.keys, self._prefix,
self.threshold, self.layout_type,
self._project_name)
# Delete the metadata of roles no longer in 'tuf.roledb'. Obsolete roles
# may have been revoked.
_delete_obsolete_metadata(self._metadata_directory,
project_signable['signed'], False)

View file

@ -450,6 +450,12 @@
key_schema = RELPATH_SCHEMA,
value_schema = CUSTOM_SCHEMA)
# Command argument list, as used by the CLI tool.
# Example: {'keytype': ed25519, 'expires': 365,}
COMMAND_SCHEMA = SCHEMA.DictOf(
key_schema = NAME_SCHEMA,
value_schema = SCHEMA.Any())
# tuf.roledb
ROLEDB_SCHEMA = SCHEMA.Object(
object_name = 'ROLEDB_SCHEMA',
@ -986,7 +992,8 @@ def make_signable(object):
"""
if not isinstance(object, dict) or 'signed' not in object:
return { 'signed' : object, 'signatures' : [] }
return {'signed': object, 'signatures': []}
else:
return object

View file

@ -1444,8 +1444,8 @@ def create_rsa_encrypted_pem(private_key, passphrase):
# Does 'passphrase' have the correct format?
tuf.formats.PASSWORD_SCHEMA.check_match(passphrase)
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in
# 'tuf.conf', are unsupported or unavailable:
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified
# in 'tuf.conf', are unsupported or unavailable:
# 'tuf.conf.GENERAL_CRYPTO_LIBRARY' and 'tuf.conf.RSA_CRYPTO_LIBRARY'.
check_crypto_libraries(['rsa', 'general'])

View file

@ -253,7 +253,7 @@ def create_rsa_signature(private_key, data):
<Exceptions>
tuf.FormatError, if 'private_key' is improperly formatted.
TypeError, if 'private_key' is unset.
ValueError, if 'private_key' is unset.
tuf.CryptoError, if the signature cannot be generated.
@ -316,7 +316,7 @@ def create_rsa_signature(private_key, data):
raise tuf.CryptoError('An RSA signature cannot be generated: ' + str(e))
else:
raise TypeError('The required private key is unset.')
raise ValueError('The required private key is unset.')
return signature, method

View file

@ -99,7 +99,7 @@
SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz']
# The full list of supported TUF metadata extensions.
METADATA_EXTENSIONS = ['.json']
METADATA_EXTENSIONS = ['.json.gz', '.json']
def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
@ -209,7 +209,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], compression_algorithms,
consistent_snapshot)
# The root and timestamp files should also be written without a version
# number prepended if 'consistent_snaptshot' is True. Clients may request
# a timestamp and root file without knowing their version numbers.
@ -341,7 +341,6 @@ def _check_role_keys(rolename):
"""
Non-public function that verifies the public and signing keys of 'rolename'.
If either contain an invalid threshold of keys, raise an exception.
'rolename' is the full rolename (e.g., 'targets/unclaimed/django').
"""
# Extract the total number of public and private keys of 'rolename' from its
@ -354,15 +353,13 @@ def _check_role_keys(rolename):
# Raise an exception for an invalid threshold of public keys.
if total_keyids < threshold:
message = repr(rolename) + ' role contains ' + \
repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.'
raise tuf.InsufficientKeysError(message)
raise tuf.InsufficientKeysError(repr(rolename) + ' role contains'
' ' + repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.')
# Raise an exception for an invalid threshold of signing keys.
if total_signatures == 0 and total_signing_keys < threshold:
message = repr(rolename) + ' role contains ' + \
repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.'
raise tuf.InsufficientKeysError(message)
raise tuf.InsufficientKeysError(repr(rolename) + ' role contains'
' ' + repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.')
@ -422,48 +419,63 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
'repository_tool.py'. Revoked metadata files are not actually deleted until
this function is called. Obsolete metadata should *not* be retained in
"metadata.staged", otherwise they may be re-loaded by 'load_repository()'.
Note: Obsolete metadata may not always be easily detected (by inspecting
top-level metadata during loading) due to partial metadata and top-level
metadata that have not been written yet.
"""
# Walk the repository's metadata 'targets' sub-directory, where all the
# metadata of delegated roles is stored.
targets_metadata = os.path.join(metadata_directory, 'targets')
# The 'targets.json' metadata is not visited, only its child delegations.
# The 'targets/unclaimed/django.json' role would be located in the
# '{repository_directory}/metadata/targets/unclaimed/' directory.
if os.path.exists(targets_metadata) and os.path.isdir(targets_metadata):
for directory_path, junk_directories, files in os.walk(targets_metadata):
# Walk the repository's metadata sub-directory, which is where all metadata
# is stored (including delegated roles). The 'django.json' role (e.g.,
# delegated by Targets) would be located in the
# '{repository_directory}/metadata/' directory.
if os.path.exists(metadata_directory) and os.path.isdir(metadata_directory):
for directory_path, junk_directories, files in os.walk(metadata_directory):
# 'files' here is a list of target file names.
for basename in files:
metadata_path = os.path.join(directory_path, basename)
# Strip the metadata dirname and the leading path separator.
# '{repository_directory}/metadata/targets/unclaimed/django.json' -->
# 'targets/unclaimed/django.json'
# '{repository_directory}/metadata/django.json' -->
# 'django.json'
metadata_name = \
metadata_path[len(metadata_directory):].lstrip(os.path.sep)
# Strip the version number if 'consistent_snapshot' is True. Example:
# '10.django.json' --> 'django.json'. Consistent and non-consistent
# '10.django.json' --> 'django.json'. Consistent and non-consistent
# metadata might co-exist if write() and
# write(consistent_snapshot=True) are mixed, so ensure only
# '<version_number>.filename' metadata is stripped.
embedded_version_number = None
if metadata_name not in snapshot_metadata['meta']:
# Should we check if 'consistent_snapshot' is True? It might have been
# set previously, but 'consistent_snapshot' can potentially be False
# now. We'll proceed with the understanding that 'metadata_name' can
# have a prepended version number even though the repository is now
# a non-consistent one.
if metadata_name not in snapshot_metadata['meta']:
metadata_name, embedded_version_number = \
_strip_version_number(metadata_name, consistent_snapshot)
else:
logger.debug(repr(metadata_name) + ' found in the snapshot role.')
# Strip filename extensions. The role database does not include the
# metadata extension.
metadata_name_extension = metadata_name
for metadata_extension in METADATA_EXTENSIONS:
if metadata_name.endswith(metadata_extension):
metadata_name = metadata_name[:-len(metadata_extension)]
break
else:
logger.debug(repr(metadata_name) + ' does not match'
' supported extension ' + repr(metadata_extension))
if metadata_name in ['root', 'targets', 'snapshot', 'timestamp']:
return
# Delete the metadata file if it does not exist in 'tuf.roledb'.
# 'repository_tool.py' might have removed 'metadata_name,'
# but its metadata file is not actually deleted yet. Do it now.
@ -471,18 +483,14 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
logger.info('Removing outdated metadata: ' + repr(metadata_path))
os.remove(metadata_path)
# Delete outdated consistent snapshots. Snapshot metadata includes the
# file extension of roles. TODO: Should we leave it up to integrators
# to remove outdated consistent snapshots?
"""
if consistent_snapshot and embedded_version_number is not None:
file_hashes = list(snapshot_metadata['meta'][metadata_name_extension] \
['hashes'].values())
if embedded_digest not in file_hashes:
logger.info('Removing outdated metadata: ' + repr(metadata_path))
os.remove(metadata_path)
"""
else:
logger.debug('Not removing metadata: ' + repr(metadata_path))
# TODO: Should we delete outdated consistent snapshots, or does it make
# more sense for integrators to remove outdated consistent snapshots?
else:
logger.debug('Metadata directory does not exist: ' + repr(metadata_directory))
@ -518,8 +526,12 @@ def _strip_version_number(metadata_filename, consistent_snapshot):
dirname, basename = os.path.split(metadata_filename)
version_number, basename = basename.split('.', 1)
stripped_metadata_filename = os.path.join(dirname, basename)
return stripped_metadata_filename, version_number
if not version_number.isdigit():
return metadata_filename, ''
else:
return stripped_metadata_filename, version_number
else:
return metadata_filename, ''
@ -530,7 +542,7 @@ def _strip_version_number(metadata_filename, consistent_snapshot):
def _load_top_level_metadata(repository, top_level_filenames):
"""
Load the metadata of the Root, Timestamp, Targets, and Snapshot roles. At a
minimum, the Root role must exist and successfully load.
minimum, the Root role must exist and load successfully.
"""
root_filename = top_level_filenames[ROOT_FILENAME]
@ -546,6 +558,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
# Load 'root.json'. A Root role file without a version number is always
# written.
if os.path.exists(root_filename):
# Initialize the key and role metadata of the top-level roles.
signable = tuf.util.load_json_file(root_filename)
tuf.formats.check_signable_object_format(signable)
@ -559,9 +572,16 @@ def _load_top_level_metadata(repository, top_level_filenames):
for signature in signable['signatures']:
if signature not in roleinfo['signatures']:
roleinfo['signatures'].append(signature)
else:
logger.debug('Found a Root signature that is already loaded:'
' ' + repr(signature))
if os.path.exists(root_filename + '.gz'):
roleinfo['compressions'].append('gz')
else:
logger.debug('A compressed Root file was not found.')
# By default, roleinfo['partial_loaded'] of top-level roles should be set
# to False in 'create_roledb_from_root_metadata()'. Update this field, if
@ -569,6 +589,9 @@ def _load_top_level_metadata(repository, top_level_filenames):
if _metadata_is_partially_loaded('root', signable, roleinfo):
roleinfo['partial_loaded'] = True
else:
logger.debug('Root was not partially loaded.')
_log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'],
ROOT_EXPIRES_WARN_SECONDS)
@ -578,8 +601,8 @@ def _load_top_level_metadata(repository, top_level_filenames):
consistent_snapshot = root_metadata['consistent_snapshot']
else:
message = 'Cannot load the required root file: ' + repr(root_filename)
raise tuf.RepositoryError(message)
raise tuf.RepositoryError('Cannot load the required root file:'
' ' + repr(root_filename))
# Load 'timestamp.json'. A Timestamp role file without a version number is
# always written.
@ -595,9 +618,15 @@ def _load_top_level_metadata(repository, top_level_filenames):
roleinfo['version'] = timestamp_metadata['version']
if os.path.exists(timestamp_filename + '.gz'):
roleinfo['compressions'].append('gz')
else:
logger.debug('A compressed Timestamp file was not found.')
if _metadata_is_partially_loaded('timestamp', signable, roleinfo):
roleinfo['partial_loaded'] = True
else:
logger.debug('The Timestamp role was not partially loaded.')
_log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'],
TIMESTAMP_EXPIRES_WARN_SECONDS)
@ -605,7 +634,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
tuf.roledb.update_roleinfo('timestamp', roleinfo, mark_role_as_dirty=False)
else:
pass
logger.debug('Cannot load the Timestamp file: ' + repr(timestamp_filename))
# Load 'snapshot.json'. A consistent snapshot.json must be calculated if
# 'consistent_snapshot' is True.
@ -633,9 +662,15 @@ def _load_top_level_metadata(repository, top_level_filenames):
roleinfo['version'] = snapshot_metadata['version']
if os.path.exists(snapshot_filename + '.gz'):
roleinfo['compressions'].append('gz')
else:
logger.debug('A compressed Snapshot file was not loaded.')
if _metadata_is_partially_loaded('snapshot', signable, roleinfo):
roleinfo['partial_loaded'] = True
else:
logger.debug('Snapshot was not partially loaded.')
_log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'],
SNAPSHOT_EXPIRES_WARN_SECONDS)
@ -643,7 +678,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
tuf.roledb.update_roleinfo('snapshot', roleinfo, mark_role_as_dirty=False)
else:
pass
logger.debug('The Snapshot file cannot be loaded: ' + repr(snapshot_filename))
# Load 'targets.json'. A consistent snapshot of the Targets role must be
# calculated if 'consistent_snapshot' is True.
@ -669,9 +704,15 @@ def _load_top_level_metadata(repository, top_level_filenames):
roleinfo['delegations'] = targets_metadata['delegations']
if os.path.exists(targets_filename + '.gz'):
roleinfo['compressions'].append('gz')
else:
logger.debug('Compressed Targets file cannot be loaded.')
if _metadata_is_partially_loaded('targets', signable, roleinfo):
roleinfo['partial_loaded'] = True
else:
logger.debug('Targets file was not partially loaded.')
_log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'],
TARGETS_EXPIRES_WARN_SECONDS)
@ -707,7 +748,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
tuf.roledb.add_role(rolename, roleinfo)
else:
pass
logger.debug('The Targets file cannot be loaded: ' + repr(targets_filename))
return repository, consistent_snapshot
@ -1442,11 +1483,11 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
# This is not a recognized key. Raise an exception.
else:
raise tuf.Error('Unsupported keytype: '+keyid)
raise tuf.Error('Unsupported keytype: ' + keyid)
# Do we have a duplicate?
if keyid in keyids:
raise tuf.Error('Same keyid listed twice: '+keyid)
raise tuf.Error('Same keyid listed twice: ' + keyid)
# Add the loaded keyid for the role being processed.
keyids.append(keyid)
@ -1556,9 +1597,9 @@ def generate_targets_metadata(targets_directory, target_files, version,
# Ensure all target files listed in 'target_files' exist. If just one of
# these files does not exist, raise an exception.
if not os.path.exists(target_path):
message = repr(target_path) + ' cannot be read. Unable to generate ' +\
'targets metadata.'
raise tuf.Error(message)
raise tuf.Error(repr(target_path) + ' cannot be read.'
' Unable to generate targets metadata.')
# Add 'custom' if it has been provided. Custom data about the target is
# optional and will only be included in metadata (i.e., a 'custom' field in
@ -1580,7 +1621,10 @@ def generate_targets_metadata(targets_directory, target_files, version,
if not os.path.exists(digest_target):
logger.warning('Hard linking target file to ' + repr(digest_target))
os.link(target_path, digest_target)
else:
logger.debug(repr(digest_target) + ' already exists.')
# Generate the targets metadata object.
targets_metadata = tuf.formats.TargetsFile.make_metadata(version,
expiration_date,
@ -1814,23 +1858,12 @@ def sign_metadata(metadata_object, keyids, filename):
# keyid of 'keyids'.
signable = tuf.formats.make_signable(metadata_object)
# Sign the metadata with each keyid in 'keyids'.
# Sign the metadata with each keyid in 'keyids'. 'signable' should have
# zero signatures (metadata_object contained none).
for keyid in keyids:
# Load the signing key.
key = tuf.keydb.get_key(keyid)
# TODO logger.info('Signing ' + repr(filename) + ' with ' + key['keyid'])
# Create a new signature list. If 'keyid' is encountered, do not add it
# to the new list.
signatures = []
for signature in signable['signatures']:
if not keyid == signature['keyid']:
signatures.append(signature)
else:
continue
signable['signatures'] = signatures
# Generate the signature using the appropriate signing method.
if key['keytype'] in SUPPORTED_KEY_TYPES:
@ -1926,55 +1959,38 @@ def write_metadata_file(metadata, filename, version_number,
# if re-saving is required.
file_content = _get_written_metadata(metadata)
# Verify whether new metadata needs to be written (i.e., has not been
# previously written or has changed.
write_new_metadata = False
# We previously verified whether new metadata needed to be written (i.e., has
# not been previously written or has changed). It is now assumed that the
# caller intends to write changes that have been marked as dirty.
# Has the uncompressed metadata changed? Does it exist? If so, set
# 'write_compressed_version' to 'True' so that it is written.
# Compressed metadata should only be written if it does not exist or the
# uncompressed version has changed).
new_digests = {}
for hash_algorithm in tuf.conf.REPOSITORY_HASH_ALGORITHMS:
digest_object = tuf.hash.digest(hash_algorithm)
digest_object.update(file_content)
new_digests.update({hash_algorithm: digest_object.hexdigest()})
try:
file_length_junk, old_digests = tuf.util.get_file_details(written_filename)
if old_digests != new_digests:
write_new_metadata = True
# The 'metadata' object is written to 'file_object', including compressed
# versions. To avoid partial metadata from being written, 'metadata' is
# first written to a temporary location (i.e., 'file_object') and then
# moved to 'filename'.
file_object = tuf.util.TempFile()
# 'tuf.Error' raised if 'filename' does not exist.
except tuf.Error:
write_new_metadata = True
if write_new_metadata:
# The 'metadata' object is written to 'file_object', including compressed
# versions. To avoid partial metadata from being written, 'metadata' is
# first written to a temporary location (i.e., 'file_object') and then
# moved to 'filename'.
file_object = tuf.util.TempFile()
# Serialize 'metadata' to the file-like object and then write
# 'file_object' to disk. The dictionary keys of 'metadata' are sorted
# and indentation is used. The 'tuf.util.TempFile' file-like object is
# automically closed after the final move.
file_object.write(file_content)
logger.debug('Saving ' + repr(written_filename))
file_object.move(written_filename)
if consistent_snapshot:
dirname, basename = os.path.split(written_filename)
# Serialize 'metadata' to the file-like object and then write
# 'file_object' to disk. The dictionary keys of 'metadata' are sorted
# and indentation is used. The 'tuf.util.TempFile' file-like object is
# automically closed after the final move.
file_object.write(file_content)
logger.debug('Saving ' + repr(written_filename))
file_object.move(written_filename)
if consistent_snapshot:
dirname, basename = os.path.split(written_filename)
basename = basename.split(METADATA_EXTENSION, 1)[0]
version_and_filename = str(version_number) + '.' + basename + METADATA_EXTENSION
written_consistent_filename = os.path.join(dirname, version_and_filename)
basename = basename.split(METADATA_EXTENSION, 1)[0]
version_and_filename = str(version_number) + '.' + basename + METADATA_EXTENSION
written_consistent_filename = os.path.join(dirname, version_and_filename)
logger.info('Linking ' + repr(written_consistent_filename))
os.link(written_filename, written_consistent_filename)
else:
logger.info('Not linking a consistent filename for: ' + repr(written_filename))
logger.info('Linking ' + repr(written_consistent_filename))
os.link(written_filename, written_consistent_filename)
# Generate the compressed versions of 'metadata', if necessary. A compressed
# file may be written (without needing to write the uncompressed version) if
# the repository maintainer adds compression after writing the uncompressed
@ -2001,15 +2017,20 @@ def write_metadata_file(metadata, filename, version_number,
finally:
gzip_object.close()
else:
raise tuf.FormatError('Unknown compression algorithm: ' + repr(compressio_algorithm))
# This else clause should not be reached because the
# 'compression_algorithms' list is validated against the
# COMPRESSIONS_SCHEMA above.
else: # pragma: no cover
raise tuf.FormatError('Unknown compression algorithm:'
' ' + repr(compression_algorithm))
# Save the compressed version, ensuring an unchanged file is not re-saved.
# Re-saving the same compressed version may cause its digest to
# unexpectedly change (gzip includes a timestamp) even though content has
# not changed.
_write_compressed_metadata(file_object, compressed_filename,
write_new_metadata, consistent_snapshot, version_number)
True, consistent_snapshot,
version_number)
return written_filename
@ -2029,7 +2050,7 @@ def _write_compressed_metadata(file_object, compressed_filename,
# If a consistent snapshot is unneeded, 'file_object' may be simply moved
# 'compressed_filename' if not already written.
if not consistent_snapshot:
if not os.path.exists(compressed_filename) or write_new_metadata:
if write_new_metadata or not os.path.exists(compressed_filename):
file_object.move(compressed_filename)
# The temporary file must be closed if 'file_object.move()' is not used.
@ -2038,43 +2059,34 @@ def _write_compressed_metadata(file_object, compressed_filename,
else:
file_object.close_temp_file()
# Consistent snapshots = True. Ensure the file's digest is included in the
# consistent snapshots = True. Ensure the version number is included in the
# compressed filename written, provided it does not already exist.
else:
compressed_content = file_object.read()
new_digests = []
consistent_filenames = []
# Multiple snapshots may be written if the repository uses multiple
# hash algorithms. Generate the digest of the compressed content.
for hash_algorithm in tuf.conf.REPOSITORY_HASH_ALGORITHMS:
digest_object = tuf.hash.digest(hash_algorithm)
digest_object.update(compressed_content)
new_digests.append(digest_object.hexdigest())
# Attach each version number to the compressed consistent snapshot filename.
for new_digest in new_digests:
dirname, basename = os.path.split(compressed_filename)
for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS:
if basename.endswith(compression_extension):
basename = basename.split(compression_extension, 1)[0]
version_and_filename = str(version_number) + '.' + basename + compression_extension
consistent_filename = None
version_and_filename = None
# Attach the version number to the compressed, consistent snapshot filename.
dirname, basename = os.path.split(compressed_filename)
for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS:
if basename.endswith(compression_extension):
basename = basename.split(compression_extension, 1)[0]
version_and_filename = str(version_number) + '.' + basename + compression_extension
consistent_filename = os.path.join(dirname, version_and_filename)
consistent_filenames.append(os.path.join(dirname, version_and_filename))
else:
logger.debug('Skipping compression extension: ' + repr(compression_extension))
# Move the 'tuf.util.TempFile' object to one of the filenames so that it is
# saved and the temporary file closed. Any remaining consistent snapshots
# may still need to be copied or linked.
compressed_filename = consistent_filenames.pop()
if not os.path.exists(compressed_filename):
logger.info('Saving ' + repr(compressed_filename))
file_object.move(compressed_filename)
# saved and the temporary file closed.
if not os.path.exists(consistent_filename):
logger.info('Saving ' + repr(consistent_filename))
file_object.move(consistent_filename)
# Save any remaining compressed consistent snapshots.
for consistent_filename in consistent_filenames:
if not os.path.exists(consistent_filename):
logger.info('Linking ' + repr(consistent_filename))
os.link(compressed_filename, consistent_filename)
else:
logger.debug('Skipping already written compressed file:'
' ' + repr(consistent_filename))

View file

@ -240,11 +240,10 @@ def write(self, write_partial=False, consistent_snapshot=False,
'timestamp': os.path.join(self._metadata_directory, repo_lib.TIMESTAMP_FILENAME)}
snapshot_signable = None
dirty_rolenames = tuf.roledb.get_dirty_roles()
for dirty_rolename in dirty_rolenames:
# Ignore top-level roles, they will be generated later in this method.
if dirty_rolename in ['root', 'targets', 'snapshot', 'timestamp']:
continue
@ -449,7 +448,31 @@ def dirty_roles(self):
"""
logger.info('Dirty roles: ' + str(tuf.roledb.get_dirty_roles()))
def mark_dirty(self, roles):
"""
<Purpose>
Mark the list of 'roles' as dirty.
<Arguments>
roles:
A list of roles to mark as dirty. on the next write, these roles
will be written to disk.
<Exceptions>
None.
<Side Effects>
None.
<Returns>
None.
"""
tuf.roledb.mark_dirty(roles)
@staticmethod
@ -2835,7 +2858,7 @@ def load_repository(repository_directory):
targets_objects = {}
loaded_metadata = []
targets_objects['targets'] = repository.targets
for metadata_role in os.listdir(metadata_directory):
metadata_path = os.path.join(metadata_directory, metadata_role)

View file

@ -428,6 +428,45 @@ def get_dirty_roles(repository_name='default'):
def mark_dirty(roles, repository_name='default'):
"""
<Purpose>
Mark the list of 'roles' as dirty.
<Arguments>
repository_name:
The name of the repository to get the dirty roles. If not supplied, the
'default' repository is searched.
roles:
A list of roles that should be marked as dirty.
<Exceptions>
tuf.FormatError, if the arguments are improperly formatted.
tuf.InvalidNameError, if 'repository_name' does not exist in the role
database.
<Side Effects>
None.
<Returns>
None.
"""
# Are the arguments properly formatted? If not, raise tuf.FormatError.
tuf.formats.NAMES_SCHEMA.check_match(roles)
tuf.formats.NAME_SCHEMA.check_match(repository_name)
global _roledb_dict
global _dirty_roles
if repository_name not in _roledb_dict or repository_name not in _dirty_roles:
raise tuf.InvalidNameError('Repository name does not' ' exist: ' +
repository_name)
_dirty_roles[repository_name].update(roles)
def role_exists(rolename, repository_name='default'):
@ -872,6 +911,8 @@ def clear_roledb(repository_name='default', clear_all=False):
if clear_all:
_roledb_dict = {}
_roledb_dict['default'] = {}
_dirty_roles = {}
_dirty_roles['default'] = set()
return
_roledb_dict[repository_name] = {}

249
tuf/scripts/tuf.py Executable file
View file

@ -0,0 +1,249 @@
#!/usr/bin/env python
"""
<Program Name>
tuf.py
<Author>
Vladimir Diaz <vladimir.v.diaz@gmail.com>
<Started>
August 2016.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Provide a command line interface to the repository tool
(i.e., tuf.repository_tool.py). This CLI removes the need to write code,
which is required by the repository and developer tools.
<Usage>
$ tuf.py --init </path/to/repo> [--consistent-snapshot=false]
$ tuf.py --gen-key <role> --keytype <keytype> --keystore </path/to/keystore> [--expires=<days>]
$ tuf.py --add <target> --repo <path/to/repo>
$ tuf.py --remove <target> --repo <path/to/repo>
$ tuf.py --snapshot <path/to/repo>
$ tuf.py --timestamp <path/to/repo>
$ tuf.py --sign <role> --repo <path/to/repo>
$ tuf.py --commit <path/to/repo>
$ tuf.py --regenerate <path/to/repo>
$ tuf.py --clean --repo
<Options>
--init
--gen-key
--add
--remove
--snapshot
--timestamp
--sign
--commit
--regenerate
--clean
--verbose:
Set the verbosity level of logging messages. Accepts values 1-5.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import sys
import optparse
import logging
import tuf
import tuf.log
import tuf.formats
from tuf.repository_tool import *
# See 'log.py' to learn how logging is handled in TUF.
logger = logging.getLogger('tuf.tuf')
def update_repository(repository_path, command, command_arguments):
"""
<Purpose>
Update or create the repository found in 'repository_path'. What to
update is determined by the 'command,' which can correspond to one of the
supported repository tool functions.
<Arguments>
repository_path:
command:
command_arguments:
<Exceptions>
tuf.FormatError, if any of the arugments are improperly formatted.
<Side Effects>
The TUF repository at 'repository_path' is either created or modified.
<Returns>
None.
"""
# Do the arguments have the correct format?
tuf.formats.URL_SCHEMA.check_match(repository_path)
tuf.formats.NAME_SCHEMA.check_match(command)
tuf.formats.COMMAND_SCHEMA.check_match(command_arguments)
# Set the local repository directory containing all of the metadata files.
tuf.conf.repository_directory = repository_path
if command == 'init':
repository = create_new_repository(repository_path)
# Import the root key(s).
try:
if command_arguments['keytype'] == 'ed25519':
repository.root.load_signing_key
# Write the changes to the staged repository directory.
repository.write(consistent_snapshot=command_arguments['consistent_snapshot'])
elif command = 'gen-key':
command_arguments
def parse_options():
"""
<Purpose>
Parse the command-line options and set the logging level
as specified by the user through the --verbose option.
The 'tuf' command expects the repository path to be set by the user.
Example:
$ python --init ./repository --consistent-snapshot=false --verbose 3
If a required option is unset, a parser error is printed and the scripts
exits.
<Arguments>
None.
<Exceptions>
None.
<Side Effects>
Sets the logging level for TUF logging.
<Returns>
A tuple ('options.REPOSITORY_PATH', command, command_arguments). 'command'
'command_arguments' correspond to a repository tool fuction.
"""
parser = optparse.OptionParser()
# Add the options supported by 'tuf.py' to the option parser.
parser.add_option('--verbose', dest='VERBOSE', type=int, default=2,
help='Set the verbosity level of logging messages.'
'The lower the setting, the greater the verbosity.')
parser.add_option('--init', dest='INIT', type='string', default='.',
help='')
parser.add_option('--gen-key', dest='GEN-KEY', type='string', default='.',
help='')
parser.add_option('--keytype', dest='KEYTYPE', type='string', default='ed25519',
help='')
parser.add_option('--expires', dest='EXPIRES', type=int, default=365,
help='')
parser.add_option('--add', dest='ADD', type='string', default='',
help='')
parser.add_option('--remove', dest='REMOVE', type='string', default='',
help='')
parser.add_option('--snapshot', dest='SNAPSHOT', type='string', default='.',
help='')
parser.add_option('--timestamp', dest='TIMESTAMP', type='string', default='.',
help='')
parser.add_option('--sign', dest='SIGN', type='string', default='.',
help='')
parser.add_option('--commit', dest='COMMIT', type='string', default='.',
help='')
parser.add_option('--regenerate', dest='REGENERATE', type='string', default='.',
help='')
parser.add_option('--clean', dest='CLEAN', type='string', default='.',
help='')
options, args = parser.parse_args()
# Set the logging level.
if options.VERBOSE == 5:
tuf.log.set_log_level(logging.CRITICAL)
elif options.VERBOSE == 4:
tuf.log.set_log_level(logging.ERROR)
elif options.VERBOSE == 3:
tuf.log.set_log_level(logging.WARNING)
elif options.VERBOSE == 2:
tuf.log.set_log_level(logging.INFO)
elif options.VERBOSE == 1:
tuf.log.set_log_level(logging.DEBUG)
else:
tuf.log.set_log_level(logging.NOTSET)
# Ensure the repository path was set by the user.
if options.REPOSITORY_PATH is None:
parser.error('The repository path is unknown.')
# Return a tuple containing the repository path, command, and command
# arguments needed by the repository tool.
return options.REPOSITORY_PATH, command, command_options
if __name__ == '__main__':
# Parse the options and set the logging level.
repository_path, command, command_arguments = parse_options()
# Update the repository depending on the option specified on the command
# line. For example,
# tuf.repository_tool.generate_and_write_ed25519_keypair('./path/to/keystore/root')
# is called if the user invokes:
# $ tuf --gen-key root --keystore ./path/to/keystore --keytype ed25519
try:
update_repository(repository_path, command, command_arguments)
except (tuf.Error) as e:
sys.stderr.write('Error: ' + str(e) + '\n')
sys.exit(1)
# Successfully updated the local repository.
sys.exit(0)