From 2ef5f3eb6819bbd3594e28f49fdfabdda3e3cb0c Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 15 Aug 2016 14:02:10 -0400 Subject: [PATCH 01/27] Add test case for _write_compressed_metadata() --- tests/test_repository_lib.py | 15 +++++++++++++++ tuf/repository_lib.py | 24 ++++++++++++++++-------- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index b4109683..e0f727d9 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -735,6 +735,21 @@ def test_write_metadata_file(self): self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file, root_signable, output_filename, version_number, compression_algorithms, 3) + + + + def test__write_compressed_metadata(self): + # Test for invalid 'compressed_filename' argument and set + # 'write_new_metadata' to False. + file_object = tuf.util.TempFile() + non_existent_filename = \ + os.path.join(cls.temporary_directory, 'non-existent_compressed_filename') + write_new_metadata = False + repo_lib._write_compressed_metadata(file_object, + compressed_filename=non_existent_filename, + write_new_metadata=write_new_metadata, + consistent_snapshot=False, + version_number=8) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index c0414793..cbacab7e 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -341,7 +341,6 @@ def _check_role_keys(rolename): """ Non-public function that verifies the public and signing keys of 'rolename'. If either contain an invalid threshold of keys, raise an exception. - 'rolename' is the full rolename (e.g., 'targets/unclaimed/django'). """ # Extract the total number of public and private keys of 'rolename' from its @@ -354,15 +353,13 @@ def _check_role_keys(rolename): # Raise an exception for an invalid threshold of public keys. if total_keyids < threshold: - message = repr(rolename) + ' role contains ' + \ - repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.' - raise tuf.InsufficientKeysError(message) + raise tuf.InsufficientKeysError(repr(rolename) + ' role contains' + ' ' + repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.') # Raise an exception for an invalid threshold of signing keys. if total_signatures == 0 and total_signing_keys < threshold: - message = repr(rolename) + ' role contains ' + \ - repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.' - raise tuf.InsufficientKeysError(message) + raise tuf.InsufficientKeysError(repr(rolename) + ' role contains' + ' ' + repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.') @@ -2059,7 +2056,10 @@ def _write_compressed_metadata(file_object, compressed_filename, if basename.endswith(compression_extension): basename = basename.split(compression_extension, 1)[0] version_and_filename = str(version_number) + '.' + basename + compression_extension - + + else: + logger.debug('Skipping unsupported compressed file: ' + repr(basename)) + consistent_filenames.append(os.path.join(dirname, version_and_filename)) # Move the 'tuf.util.TempFile' object to one of the filenames so that it is @@ -2070,12 +2070,20 @@ def _write_compressed_metadata(file_object, compressed_filename, logger.info('Saving ' + repr(compressed_filename)) file_object.move(compressed_filename) + else: + logger.debug('Skipping already written compressed file:' + ' ' + repr(compressed_filename)) + # Save any remaining compressed consistent snapshots. for consistent_filename in consistent_filenames: if not os.path.exists(consistent_filename): logger.info('Linking ' + repr(consistent_filename)) os.link(compressed_filename, consistent_filename) + else: + logger.debug('Skipping linking of already written compressed file: ' + ' ' + repr(consistent_filename)) + From 2380499029e5380aeba998fe8e3fe8b196418d5a Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 15 Aug 2016 14:05:05 -0400 Subject: [PATCH 02/27] Coverage for remaining lines of _log_status_of_top_level_roles() --- tests/test_repository_tool.py | 38 ++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 59c50dde..718d61dd 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -53,7 +53,7 @@ logger = logging.getLogger('tuf.test_repository_tool') -repo_tool.disable_console_log_messages() +#repo_tool.disable_console_log_messages() class TestRepository(unittest.TestCase): @@ -242,31 +242,41 @@ def test_write_and_write_partial(self): # Verify that an exception is *not* raised for multiple repository.write(). repository.write() - # Verify the status() does not raise an exception. + # Verify that status() does not raise an exception. repository.status() - # Verify status() does not raise 'tuf.InsufficientKeysError' if a top-level - # role does and 'role1' do not contain a threshold of keys. - root_roleinfo = tuf.roledb.get_roleinfo('root') - old_threshold = root_roleinfo['threshold'] - root_roleinfo['threshold'] = 10 + # Verify that status() does not raise 'tuf.InsufficientKeysError' if a + # top-level role does not contain a threshold of keys. + targets_roleinfo = tuf.roledb.get_roleinfo('targets') + old_threshold = targets_roleinfo['threshold'] + targets_roleinfo['threshold'] = 10 + tuf.roledb.update_roleinfo('targets', targets_roleinfo) + repository.status() + + # Restore the original threshold values. + targets_roleinfo = tuf.roledb.get_roleinfo('targets') + targets_roleinfo['threshold'] = old_threshold + tuf.roledb.update_roleinfo('targets', targets_roleinfo) + + # Verify that status() does not raise 'tuf.InsufficientKeysError' if a + # delegated role does not contain a threshold of keys. role1_roleinfo = tuf.roledb.get_roleinfo('role1') old_role1_threshold = role1_roleinfo['threshold'] role1_roleinfo['threshold'] = 10 - tuf.roledb.update_roleinfo('root', root_roleinfo) tuf.roledb.update_roleinfo('role1', role1_roleinfo) repository.status() - - # Restore the original threshold values. - root_roleinfo['threshold'] = old_threshold - tuf.roledb.update_roleinfo('root', root_roleinfo) - role1_roleinfo['threshold'] = old_role1_threshold + + # Restore role1's threshold. + role1_roleinfo = tuf.roledb.get_roleinfo('role1') + role1_roleinfo['threshold'] = old_role1_threshold tuf.roledb.update_roleinfo('role1', role1_roleinfo) # Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the - # the top-level roles and 'role1' are improperly signed. + # the top-level roles. Test that 'root' is improperly signed. repository.root.unload_signing_key(root_privkey) repository.root.load_signing_key(targets_privkey) + repository.status() + repository.targets('role1').unload_signing_key(role1_privkey) repository.targets('role1').load_signing_key(targets_privkey) repository.status() From c8ffd6f8b17e179fe4d203521c842eda8ed3db2a Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 15 Aug 2016 14:26:50 -0400 Subject: [PATCH 03/27] Write 'interposition.json' to temporary location so that it gets auto-removed after testing --- tests/test_interpose_updater.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/test_interpose_updater.py b/tests/test_interpose_updater.py index 6aa6b4bb..b2c489b1 100755 --- a/tests/test_interpose_updater.py +++ b/tests/test_interpose_updater.py @@ -452,8 +452,10 @@ def test_open(self): self.assertRaises(AttributeError, myUpdater.open, 8) - url = 'http://localhost:8001/targets/file1.txt' - myUpdater.open(url, 'interposition.json') + url = 'http://localhost:8001/targets/file1.txt' + interposition_file = \ + os.path.join(self.temporary_directory, 'interposition.json') + myUpdater.open(url, interposition_file) def test_retrieve(self): @@ -462,7 +464,9 @@ def test_retrieve(self): self.assertRaises(AttributeError, myUpdater.retrieve, 8) test_source_url = 'http://localhost:8001/targets/file1.txt' - myUpdater.retrieve(test_source_url, 'interposition.json') + interposition_file = \ + os.path.join(self.temporary_directory, 'interposition.json') + myUpdater.retrieve(test_source_url, interposition_file) #self.assertRaises(tuf.NoWorkingMirrorError, myUpdater.retrieve, test_source_url) From de63311a0c8356c7f1c90ad86c1edb1d7c182642 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 15 Aug 2016 14:29:56 -0400 Subject: [PATCH 04/27] Reference global temporary directory variable by correct name --- tests/test_repository_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index e0f727d9..18bb15c0 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -743,7 +743,7 @@ def test__write_compressed_metadata(self): # 'write_new_metadata' to False. file_object = tuf.util.TempFile() non_existent_filename = \ - os.path.join(cls.temporary_directory, 'non-existent_compressed_filename') + os.path.join(self.temporary_directory, 'non-existent_compressed_filename') write_new_metadata = False repo_lib._write_compressed_metadata(file_object, compressed_filename=non_existent_filename, From f4941a2f4ea3ec2653a108b4a7d1fd82c8e3bc53 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 15 Aug 2016 14:32:06 -0400 Subject: [PATCH 05/27] Add whitespace in generate_root_metadata() --- tuf/repository_lib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index cbacab7e..1469b81c 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -1439,11 +1439,11 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, # This is not a recognized key. Raise an exception. else: - raise tuf.Error('Unsupported keytype: '+keyid) + raise tuf.Error('Unsupported keytype: ' + keyid) # Do we have a duplicate? if keyid in keyids: - raise tuf.Error('Same keyid listed twice: '+keyid) + raise tuf.Error('Same keyid listed twice: ' + keyid) # Add the loaded keyid for the role being processed. keyids.append(keyid) From 4e8e8dca7f6bb5870f45c86a741ababadaf92ec5 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 15 Aug 2016 17:13:18 -0400 Subject: [PATCH 06/27] Try to hit more lines of _write_compressed_metadata() --- tests/test_repository_lib.py | 24 ++++++++++++++++++++---- tuf/repository_lib.py | 10 +++++++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 18bb15c0..1c3e96a4 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -721,6 +721,12 @@ def test_write_metadata_file(self): self.assertTrue(os.path.exists(output_filename)) self.assertTrue(os.path.exists(output_filename + '.gz')) + # Test unknown compression algorithm. + self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file, + root_signable, output_filename, + version_number, + compression_algorithms=['bad_algo'], + consistent_snapshot=False) # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file, @@ -742,15 +748,25 @@ def test__write_compressed_metadata(self): # Test for invalid 'compressed_filename' argument and set # 'write_new_metadata' to False. file_object = tuf.util.TempFile() - non_existent_filename = \ - os.path.join(self.temporary_directory, 'non-existent_compressed_filename') - write_new_metadata = False + existing_filename = os.path.join('repository_data', 'repository', + 'metadata', 'root.json') + + write_new_metadata = False repo_lib._write_compressed_metadata(file_object, - compressed_filename=non_existent_filename, + compressed_filename=existing_filename, write_new_metadata=write_new_metadata, consistent_snapshot=False, version_number=8) + # Test for + file_object = tuf.util.TempFile() + shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.gz')) + shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.bz2')) + repo_lib._write_compressed_metadata(file_object, + compressed_filename=existing_filename, + write_new_metadata=True, + consistent_snapshot=True, + version_number=8) def test_create_tuf_client_directory(self): diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 1469b81c..509b9d6c 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -1971,7 +1971,10 @@ def write_metadata_file(metadata, filename, version_number, logger.info('Linking ' + repr(written_consistent_filename)) os.link(written_filename, written_consistent_filename) - + + else: + logger.debug('Not writing new metadata.') + # Generate the compressed versions of 'metadata', if necessary. A compressed # file may be written (without needing to write the uncompressed version) if # the repository maintainer adds compression after writing the uncompressed @@ -1999,7 +2002,8 @@ def write_metadata_file(metadata, filename, version_number, gzip_object.close() else: - raise tuf.FormatError('Unknown compression algorithm: ' + repr(compressio_algorithm)) + raise tuf.FormatError('Unknown compression algorithm:' + ' ' + repr(compression_algorithm)) # Save the compressed version, ensuring an unchanged file is not re-saved. # Re-saving the same compressed version may cause its digest to @@ -2026,7 +2030,7 @@ def _write_compressed_metadata(file_object, compressed_filename, # If a consistent snapshot is unneeded, 'file_object' may be simply moved # 'compressed_filename' if not already written. if not consistent_snapshot: - if not os.path.exists(compressed_filename) or write_new_metadata: + if write_new_metadata or not os.path.exists(compressed_filename): file_object.move(compressed_filename) # The temporary file must be closed if 'file_object.move()' is not used. From 47d94c6fc4aca6ddd48fc28073f962a36841b039 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Tue, 16 Aug 2016 10:40:34 -0400 Subject: [PATCH 07/27] Fix exception raised in _write_compressed_metadata() --- tests/test_repository_lib.py | 5 +++-- tuf/repository_lib.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 1c3e96a4..0a530ce9 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -758,12 +758,13 @@ def test__write_compressed_metadata(self): consistent_snapshot=False, version_number=8) - # Test for + # Test writing of compressed metadata when consistent snapshots is enabled. file_object = tuf.util.TempFile() shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.gz')) shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.bz2')) + compressed_filename = os.path.join(self.temporary_directory, 'root.json.gz') repo_lib._write_compressed_metadata(file_object, - compressed_filename=existing_filename, + compressed_filename=compressed_filename, write_new_metadata=True, consistent_snapshot=True, version_number=8) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 509b9d6c..179eb7b4 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -2045,6 +2045,7 @@ def _write_compressed_metadata(file_object, compressed_filename, compressed_content = file_object.read() new_digests = [] consistent_filenames = [] + version_and_filename = None # Multiple snapshots may be written if the repository uses multiple # hash algorithms. Generate the digest of the compressed content. @@ -2060,16 +2061,16 @@ def _write_compressed_metadata(file_object, compressed_filename, if basename.endswith(compression_extension): basename = basename.split(compression_extension, 1)[0] version_and_filename = str(version_number) + '.' + basename + compression_extension + consistent_filenames.append(os.path.join(dirname, version_and_filename)) else: logger.debug('Skipping unsupported compressed file: ' + repr(basename)) - - consistent_filenames.append(os.path.join(dirname, version_and_filename)) # Move the 'tuf.util.TempFile' object to one of the filenames so that it is # saved and the temporary file closed. Any remaining consistent snapshots # may still need to be copied or linked. compressed_filename = consistent_filenames.pop() + if not os.path.exists(compressed_filename): logger.info('Saving ' + repr(compressed_filename)) file_object.move(compressed_filename) From 90477a0517230dd6b1f3443d8c3e61dd79a4569d Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Tue, 16 Aug 2016 12:20:28 -0400 Subject: [PATCH 08/27] Cover remaining test cases of _write_compressed_metadata() and remove code related to prepended digests --- tests/test_repository_lib.py | 17 +++++++-- tuf/repository_lib.py | 67 ++++++++++++++---------------------- 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 0a530ce9..9eeba202 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -720,6 +720,13 @@ def test_write_metadata_file(self): consistent_snapshot=False) self.assertTrue(os.path.exists(output_filename)) self.assertTrue(os.path.exists(output_filename + '.gz')) + + # Attempt to over-write the previously written metadata file. An exception + # is not raised in this case, only a debug message is logged. + repo_lib.write_metadata_file(root_signable, output_filename, + version_number, + compression_algorithms, + consistent_snapshot=False) # Test unknown compression algorithm. self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file, @@ -760,14 +767,20 @@ def test__write_compressed_metadata(self): # Test writing of compressed metadata when consistent snapshots is enabled. file_object = tuf.util.TempFile() - shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.gz')) - shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.bz2')) + shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz')) + shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip')) + shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip')) compressed_filename = os.path.join(self.temporary_directory, 'root.json.gz') + + # For testing purposes, add additional compression algorithms to + # repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS. + repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz', 'zip', 'bz2'] repo_lib._write_compressed_metadata(file_object, compressed_filename=compressed_filename, write_new_metadata=True, consistent_snapshot=True, version_number=8) + repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz'] def test_create_tuf_client_directory(self): diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 179eb7b4..53786451 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -1941,6 +1941,9 @@ def write_metadata_file(metadata, filename, version_number, file_length_junk, old_digests = tuf.util.get_file_details(written_filename) if old_digests != new_digests: write_new_metadata = True + + else: + logger.debug(repr(written_filename) + ' has not changed.') # 'tuf.Error' raised if 'filename' does not exist. except tuf.Error: @@ -2001,7 +2004,10 @@ def write_metadata_file(metadata, filename, version_number, finally: gzip_object.close() - else: + # This else clause should not be reached because the + # 'compression_algorithms' list is validated against the + # COMPRESSIONS_SCHEMA above. + else: # pragma: no cover raise tuf.FormatError('Unknown compression algorithm:' ' ' + repr(compression_algorithm)) @@ -2039,55 +2045,34 @@ def _write_compressed_metadata(file_object, compressed_filename, else: file_object.close_temp_file() - # Consistent snapshots = True. Ensure the file's digest is included in the + # consistent snapshots = True. Ensure the version number is included in the # compressed filename written, provided it does not already exist. else: compressed_content = file_object.read() - new_digests = [] - consistent_filenames = [] + consistent_filename = None version_and_filename = None - - # Multiple snapshots may be written if the repository uses multiple - # hash algorithms. Generate the digest of the compressed content. - for hash_algorithm in tuf.conf.REPOSITORY_HASH_ALGORITHMS: - digest_object = tuf.hash.digest(hash_algorithm) - digest_object.update(compressed_content) - new_digests.append(digest_object.hexdigest()) - - # Attach each version number to the compressed consistent snapshot filename. - for new_digest in new_digests: - dirname, basename = os.path.split(compressed_filename) - for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS: - if basename.endswith(compression_extension): - basename = basename.split(compression_extension, 1)[0] - version_and_filename = str(version_number) + '.' + basename + compression_extension - consistent_filenames.append(os.path.join(dirname, version_and_filename)) - - else: - logger.debug('Skipping unsupported compressed file: ' + repr(basename)) + + # Attach the version number to the compressed, consistent snapshot filename. + dirname, basename = os.path.split(compressed_filename) + + for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS: + if basename.endswith(compression_extension): + basename = basename.split(compression_extension, 1)[0] + version_and_filename = str(version_number) + '.' + basename + compression_extension + consistent_filename = os.path.join(dirname, version_and_filename) + + else: + logger.debug('Skipping compression extension: ' + repr(compression_extension)) # Move the 'tuf.util.TempFile' object to one of the filenames so that it is - # saved and the temporary file closed. Any remaining consistent snapshots - # may still need to be copied or linked. - compressed_filename = consistent_filenames.pop() - - if not os.path.exists(compressed_filename): - logger.info('Saving ' + repr(compressed_filename)) - file_object.move(compressed_filename) + # saved and the temporary file closed. + if not os.path.exists(consistent_filename): + logger.info('Saving ' + repr(consistent_filename)) + file_object.move(consistent_filename) else: logger.debug('Skipping already written compressed file:' - ' ' + repr(compressed_filename)) - - # Save any remaining compressed consistent snapshots. - for consistent_filename in consistent_filenames: - if not os.path.exists(consistent_filename): - logger.info('Linking ' + repr(consistent_filename)) - os.link(compressed_filename, consistent_filename) - - else: - logger.debug('Skipping linking of already written compressed file: ' - ' ' + repr(consistent_filename)) + ' ' + repr(consistent_filename)) From d312e3eeaf5c5a73cad5d878265630014cd9d05d Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Tue, 16 Aug 2016 13:11:48 -0400 Subject: [PATCH 09/27] Compare all digest (and cover remaining lines of write_metadata_file()) --- tuf/repository_lib.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 53786451..bddfcc31 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -1938,7 +1938,9 @@ def write_metadata_file(metadata, filename, version_number, new_digests.update({hash_algorithm: digest_object.hexdigest()}) try: - file_length_junk, old_digests = tuf.util.get_file_details(written_filename) + file_length_junk, old_digests = \ + tuf.util.get_file_details(written_filename, + hash_algorithms=tuf.conf.REPOSITORY_HASH_ALGORITHMS) if old_digests != new_digests: write_new_metadata = True From abbda071a1f0eda7be4b3dbcae987ee0b52a3e6d Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Tue, 16 Aug 2016 14:32:04 -0400 Subject: [PATCH 10/27] Test for invalid keytype in sign_metadata() --- tests/test_repository_lib.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 9eeba202..1d3da244 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -683,13 +683,20 @@ def test_sign_metadata(self): tuf.keydb.add_key(targets_private_key) root_keyids.extend(tuf.roledb.get_role_keyids('targets')) - # Add the snapshot's public key (to test whether non-private keys are - # ignored by sign_metadata()). - root_keyids.extend(tuf.roledb.get_role_keyids('snapshot')) + + # Add the snapshot's public key (to test whether non-root keys are + # ignored by sign_metadata()). Also add an invalid keyid to 'root_keyids', + # which sign_metadata() is expected to ignore. + root_keyids.extend(tuf.roledb.get_role_keyids('snapshot')) root_signable = repo_lib.sign_metadata(root_metadata, root_keyids, root_filename) self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable)) + # Add an invalid keytype to one of the root keys. + root_keyid = root_keyids[0] + tuf.keydb._keydb_dict['default'][root_keyid]['keytype'] = 'bad_keytype' + self.assertRaises(tuf.Error, repo_lib.sign_metadata, root_metadata, + root_keyids, root_filename) # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo_lib.sign_metadata, 3, root_keyids, From 6797db9d25870ea38e3513546be1fbd20aa72b2b Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Wed, 17 Aug 2016 09:55:25 -0400 Subject: [PATCH 11/27] Finish coverage for sign_metadata() and generate_targets_metadata() --- tests/test_repository_lib.py | 31 +++++++++++++++++++------------ tuf/formats.py | 3 ++- tuf/repository_lib.py | 26 +++++++++----------------- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 1d3da244..36d357f8 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -519,6 +519,12 @@ def test_generate_targets_metadata(self): self.assertTrue(len(list_targets_directory) + 1, len(new_list_targets_directory)) + # Verify that an exception is not raised if the target files already exist. + repo_lib.generate_targets_metadata(targets_directory, target_files, + version, expiration_date, delegations, + write_consistent_targets=True) + + # Verify that 'targets_metadata' contains a 'custom' entry (optional) # for 'file.txt'. self.assertTrue('custom' in targets_metadata['targets']['file.txt']) @@ -658,20 +664,22 @@ def test_sign_metadata(self): 'keystore') root_filename = os.path.join(metadata_path, 'root.json') root_metadata = tuf.util.load_json_file(root_filename)['signed'] + targets_filename = os.path.join(metadata_path, 'targets.json') + targets_metadata = tuf.util.load_json_file(targets_filename)['signed'] tuf.keydb.create_keydb_from_root_metadata(root_metadata) tuf.roledb.create_roledb_from_root_metadata(root_metadata) root_keyids = tuf.roledb.get_role_keyids('root') + targets_keyids = tuf.roledb.get_role_keyids('targets') root_private_keypath = os.path.join(keystore_path, 'root_key') root_private_key = \ repo_lib.import_rsa_privatekey_from_file(root_private_keypath, 'password') # Sign with a valid, but not a threshold, key. - targets_private_keypath = os.path.join(keystore_path, 'targets_key') - targets_private_key = \ - repo_lib.import_ed25519_privatekey_from_file(targets_private_keypath, - 'password') + targets_public_keypath = os.path.join(keystore_path, 'targets_key.pub') + targets_public_key = \ + repo_lib.import_ed25519_publickey_from_file(targets_public_keypath) # sign_metadata() expects the private key 'root_metadata' to be in # 'tuf.keydb'. Remove any public keys that may be loaded before @@ -679,19 +687,18 @@ def test_sign_metadata(self): # raised. tuf.keydb.remove_key(root_private_key['keyid']) tuf.keydb.add_key(root_private_key) - tuf.keydb.remove_key(targets_private_key['keyid']) - tuf.keydb.add_key(targets_private_key) + tuf.keydb.remove_key(targets_public_key['keyid']) + tuf.keydb.add_key(targets_public_key) - root_keyids.extend(tuf.roledb.get_role_keyids('targets')) - - # Add the snapshot's public key (to test whether non-root keys are - # ignored by sign_metadata()). Also add an invalid keyid to 'root_keyids', - # which sign_metadata() is expected to ignore. - root_keyids.extend(tuf.roledb.get_role_keyids('snapshot')) + # Verify that a valid root signable is generated. root_signable = repo_lib.sign_metadata(root_metadata, root_keyids, root_filename) self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable)) + # Test for an unset private key (in this case, target's). + repo_lib.sign_metadata(targets_metadata, targets_keyids, + targets_filename) + # Add an invalid keytype to one of the root keys. root_keyid = root_keyids[0] tuf.keydb._keydb_dict['default'][root_keyid]['keytype'] = 'bad_keytype' diff --git a/tuf/formats.py b/tuf/formats.py index 0a5b785f..b81a909d 100755 --- a/tuf/formats.py +++ b/tuf/formats.py @@ -986,7 +986,8 @@ def make_signable(object): """ if not isinstance(object, dict) or 'signed' not in object: - return { 'signed' : object, 'signatures' : [] } + return {'signed': object, 'signatures': []} + else: return object diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index bddfcc31..9249b3a8 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -1553,9 +1553,9 @@ def generate_targets_metadata(targets_directory, target_files, version, # Ensure all target files listed in 'target_files' exist. If just one of # these files does not exist, raise an exception. if not os.path.exists(target_path): - message = repr(target_path) + ' cannot be read. Unable to generate ' +\ - 'targets metadata.' - raise tuf.Error(message) + raise tuf.Error(repr(target_path) + ' cannot be read.' + ' Unable to generate targets metadata.') + # Add 'custom' if it has been provided. Custom data about the target is # optional and will only be included in metadata (i.e., a 'custom' field in @@ -1577,7 +1577,10 @@ def generate_targets_metadata(targets_directory, target_files, version, if not os.path.exists(digest_target): logger.warning('Hard linking target file to ' + repr(digest_target)) os.link(target_path, digest_target) - + + else: + logger.debug(repr(digest_target) + ' already exists.') + # Generate the targets metadata object. targets_metadata = tuf.formats.TargetsFile.make_metadata(version, expiration_date, @@ -1811,23 +1814,12 @@ def sign_metadata(metadata_object, keyids, filename): # keyid of 'keyids'. signable = tuf.formats.make_signable(metadata_object) - # Sign the metadata with each keyid in 'keyids'. + # Sign the metadata with each keyid in 'keyids'. 'signable' should have + # zero signatures (metadata_object contained none). for keyid in keyids: # Load the signing key. key = tuf.keydb.get_key(keyid) - # TODO logger.info('Signing ' + repr(filename) + ' with ' + key['keyid']) - - # Create a new signature list. If 'keyid' is encountered, do not add it - # to the new list. - signatures = [] - for signature in signable['signatures']: - if not keyid == signature['keyid']: - signatures.append(signature) - - else: - continue - signable['signatures'] = signatures # Generate the signature using the appropriate signing method. if key['keytype'] in SUPPORTED_KEY_TYPES: From 0673eb6ac55bad0846f9f73906f5279268b4da87 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 18 Aug 2016 08:26:53 -0400 Subject: [PATCH 12/27] Test coverage for _delete_obsolete_metadata() --- tests/test_repository_lib.py | 32 ++++++++++++++++++---- tests/test_repository_tool.py | 6 ++--- tuf/repository_lib.py | 50 +++++++++++++++++++---------------- 3 files changed, 57 insertions(+), 31 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 36d357f8..c4d4abe7 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -436,6 +436,19 @@ def test_generate_root_metadata(self): consistent_snapshot=False) self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata)) + root_keyids = tuf.roledb.get_role_keyids('root') + tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'bad_keytype' + self.assertRaises(tuf.Error, repo_lib.generate_root_metadata, 1, + expires, consistent_snapshot=False) + + # Reset the root key's keytype, so that we can next verify that a different + # tuf.Error exception is raised for duplicate keyids. + tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'rsa' + + # Add duplicate keyid to root's roleinfo. + tuf.roledb._roledb_dict['default']['root']['keyids'].append(root_keyids[0]) + self.assertRaises(tuf.Error, repo_lib.generate_root_metadata, 1, + expires, consistent_snapshot=False) # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo_lib.generate_root_metadata, @@ -870,7 +883,7 @@ def test__generate_and_write_metadata(self): repo_lib.METADATA_STAGED_DIRECTORY_NAME) targets_metadata = os.path.join('repository_data', 'repository', 'metadata', 'targets.json') - obsolete_metadata = os.path.join(metadata_directory, 'targets', + obsolete_metadata = os.path.join(metadata_directory, 'obsolete_role.json') tuf.util.ensure_parent_dir(obsolete_metadata) shutil.copyfile(targets_metadata, obsolete_metadata) @@ -884,18 +897,27 @@ def test__generate_and_write_metadata(self): tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400)) expiration = expiration.isoformat() + 'Z' targets_roleinfo['expires'] = expiration - tuf.roledb.add_role('targets/obsolete_role', targets_roleinfo) + tuf.roledb.add_role('obsolete_role', targets_roleinfo) + + repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata, + True, + targets_directory, metadata_directory, + consistent_snapshot=False, + filenames=None, + compression_algorithms=['gz']) snapshot_filepath = os.path.join('repository_data', 'repository', 'metadata', 'snapshot.json') snapshot_signable = tuf.util.load_json_file(snapshot_filepath) - tuf.roledb.remove_role('targets/obsolete_role') + tuf.roledb.remove_role('obsolete_role') self.assertTrue(os.path.exists(os.path.join(metadata_directory, - 'targets/obsolete_role.json'))) + 'obsolete_role.json'))) tuf.repository_lib._delete_obsolete_metadata(metadata_directory, snapshot_signable['signed'], False) - self.assertFalse(os.path.exists(metadata_directory + 'targets/obsolete_role.json')) + self.assertFalse(os.path.exists(metadata_directory + 'obsolete_role.json')) + shutil.copyfile(targets_metadata, obsolete_metadata) + diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 718d61dd..d6e0136b 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -53,7 +53,7 @@ logger = logging.getLogger('tuf.test_repository_tool') -#repo_tool.disable_console_log_messages() +repo_tool.disable_console_log_messages() class TestRepository(unittest.TestCase): @@ -137,7 +137,6 @@ def test_write_and_write_partial(self): metadata_directory = os.path.join(repository_directory, repo_tool.METADATA_STAGED_DIRECTORY_NAME) repository = repo_tool.create_new_repository(repository_directory) - # (1) Load the public and private keys of the top-level roles, and one # delegated role. @@ -220,7 +219,6 @@ def test_write_and_write_partial(self): # (6) Write repository. repository.targets.compressions = ['gz'] repository.write() - # Verify that the expected metadata is written. for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']: @@ -229,6 +227,8 @@ def test_write_and_write_partial(self): # Raise 'tuf.FormatError' if 'role_signable' is an invalid signable. tuf.formats.check_signable_object_format(role_signable) + + self.assertTrue(os.path.exists(role_filepath)) if role == 'targets.json': compressed_filepath = role_filepath + '.gz' diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 9249b3a8..2ba77f54 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -424,22 +424,21 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, metadata that have not been written yet. """ - # Walk the repository's metadata 'targets' sub-directory, where all the - # metadata of delegated roles is stored. - targets_metadata = os.path.join(metadata_directory, 'targets') + # Walk the repository's metadata sub-directory, which is where all metadata + # is stored (including delegated roles). The 'django.json' role (e.g., + # delegated by Targets) would be located in the + # '{repository_directory}/metadata/' directory. - # The 'targets.json' metadata is not visited, only its child delegations. - # The 'targets/unclaimed/django.json' role would be located in the - # '{repository_directory}/metadata/targets/unclaimed/' directory. - if os.path.exists(targets_metadata) and os.path.isdir(targets_metadata): - for directory_path, junk_directories, files in os.walk(targets_metadata): + # The 'targets.json' metadata is not visited, only delegated roles. + if os.path.exists(metadata_directory) and os.path.isdir(metadata_directory): + for directory_path, junk_directories, files in os.walk(metadata_directory): # 'files' here is a list of target file names. for basename in files: metadata_path = os.path.join(directory_path, basename) # Strip the metadata dirname and the leading path separator. - # '{repository_directory}/metadata/targets/unclaimed/django.json' --> - # 'targets/unclaimed/django.json' + # '{repository_directory}/metadata/django.json' --> + # 'django.json' metadata_name = \ metadata_path[len(metadata_directory):].lstrip(os.path.sep) @@ -449,17 +448,28 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, # write(consistent_snapshot=True) are mixed, so ensure only # '.filename' metadata is stripped. embedded_version_number = None - - if metadata_name not in snapshot_metadata['meta']: + + # Should we check if 'consistent_snapshot' is True? It might have + # been previously, so we'll proceed with the assumption that + # 'metadata_name' might have a prepended version number. + if metadata_name not in snapshot_metadata['meta']: metadata_name, embedded_version_number = \ _strip_version_number(metadata_name, consistent_snapshot) + + else: + logger.debug(repr(metadata_name) + ' found in the snapshot role.') # Strip filename extensions. The role database does not include the # metadata extension. metadata_name_extension = metadata_name + for metadata_extension in METADATA_EXTENSIONS: if metadata_name.endswith(metadata_extension): metadata_name = metadata_name[:-len(metadata_extension)] + + else: + logger.debug(repr(metadata_name) + ' does not match' + ' supported extension ' + repr(metadata_extension)) # Delete the metadata file if it does not exist in 'tuf.roledb'. # 'repository_tool.py' might have removed 'metadata_name,' @@ -468,17 +478,11 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, logger.info('Removing outdated metadata: ' + repr(metadata_path)) os.remove(metadata_path) - # Delete outdated consistent snapshots. Snapshot metadata includes the - # file extension of roles. TODO: Should we leave it up to integrators - # to remove outdated consistent snapshots? - """ - if consistent_snapshot and embedded_version_number is not None: - file_hashes = list(snapshot_metadata['meta'][metadata_name_extension] \ - ['hashes'].values()) - if embedded_digest not in file_hashes: - logger.info('Removing outdated metadata: ' + repr(metadata_path)) - os.remove(metadata_path) - """ + else: + logger.debug('Not removing metadata: ' + repr(metadata_path)) + + # TODO: Should we delete outdated consistent snapshots, or does it make + # more sense for integrators to remove outdated consistent snapshots? From 1bb4387656c33721db6b76a3decd10ebc102cff3 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 25 Aug 2016 10:40:15 -0400 Subject: [PATCH 13/27] Add CLI tool. The repository tool functions remain to be called. --- tuf/tuf.py | 241 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100755 tuf/tuf.py diff --git a/tuf/tuf.py b/tuf/tuf.py new file mode 100755 index 00000000..cb4bf6d2 --- /dev/null +++ b/tuf/tuf.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python + +""" + + tuf.py + + + Vladimir Diaz + + + August 2016. + + + See LICENSE for licensing information. + + + Provide a command line interface to the repository tool + (i.e., tuf.repository_tool.py). This CLI removes the need to write code, + which is required by the repository and developer tools. + + + $ tuf.py --init [--consistent-snapshot=false] + $ tuf.py --gen-key --keytype --keystore [--expires=] + $ tuf.py --add --repo + $ tuf.py --remove --repo + $ tuf.py --snapshot + $ tuf.py --timestamp + $ tuf.py --sign --repo + $ tuf.py --commit + $ tuf.py --regenerate + $ tuf.py --clean --repo + + + --init + + --gen-key + + --add + + --remove + + --snapshot + + --timestamp + + --sign + + --commit + + --regenerate + + --clean + + --verbose: + Set the verbosity level of logging messages. Accepts values 1-5. +""" + +# Help with Python 3 compatibility, where the print statement is a function, an +# implicit relative import is invalid, and the '/' operator performs true +# division. Example: print 'hello world' raises a 'SyntaxError' exception. +from __future__ import print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +import sys +import optparse +import logging + +import tuf +import tuf.log +import tuf.formats + +from tuf.repository_tool import * + +# See 'log.py' to learn how logging is handled in TUF. +logger = logging.getLogger('tuf.tuf') + + +def update_repository(repository_path, command, command_arguments): + """ + + Perform an update of the metadata and target files located at + 'repository_mirror'. Target files are saved to the 'targets' directory + in the current working directory. The current directory must already + include a 'metadata' directory, which in turn must contain the 'current' + and 'previous' directories. At a minimum, these two directories require + the 'root.json' metadata file. + + + repository_path: + The URL to the repository mirror hosting the metadata and target + files. E.g., 'http://localhost:8001' + + command: + + command_arguments: + + + + tuf.FormatError, if any of the arugments are improperly formatted. + + + The TUF repository at 'repository_path' is either created or modified. + + + None. + """ + + # Do the arguments have the correct format? + tuf.formats.URL_SCHEMA.check_match(repository_path) + tuf.formats.NAME_SCHEMA.check_match(command) + tuf.formats.NAMES_SCHEMA.check_match(command_arguments) + + # Set the local repository directory containing all of the metadata files. + tuf.conf.repository_directory = repository_path + + + +def parse_options(): + """ + + Parse the command-line options and set the logging level + as specified by the user through the --verbose option. + The 'tuf' command expects the repository path to be set by the user. + + Example: + $ python --init ./repository --consistent-snapshot=false --verbose 3 + + If a required option is unset, a parser error is printed and the scripts + exits. + + + None. + + + None. + + + Sets the logging level for TUF logging. + + + A tuple ('options.REPOSITORY_PATH', command, command_arguments). 'command' + 'command_arguments' corresponds to a repository tool fuction. + + """ + + parser = optparse.OptionParser() + + # Add the options supported by 'tuf.py' to the option parser. + parser.add_option('--verbose', dest='VERBOSE', type=int, default=2, + help='Set the verbosity level of logging messages.' + 'The lower the setting, the greater the verbosity.') + + parser.add_option('--init', dest='INIT', type='string', default='.', + help='') + + parser.add_option('--gen-key', dest='GEN-KEY', type='string', default='.', + help='') + + parser.add_option('--keytype', dest='KEYTYPE', type='string', default='ed25519', + help='') + + parser.add_option('--expires', dest='EXPIRES', type=int, default=365, + help='') + + parser.add_option('--add', dest='ADD', type='string', default='', + help='') + + parser.add_option('--remove', dest='REMOVE', type='string', default='', + help='') + + parser.add_option('--snapshot', dest='SNAPSHOT', type='string', default='.', + help='') + + parser.add_option('--timestamp', dest='TIMESTAMP', type='string', default='.', + help='') + + parser.add_option('--sign', dest='SIGN', type='string', default='.', + help='') + + parser.add_option('--commit', dest='COMMIT', type='string', default='.', + help='') + + parser.add_option('--regenerate', dest='REGENERATE', type='string', default='.', + help='') + + parser.add_option('--clean', dest='CLEAN', type='string', default='.', + help='') + + options, args = parser.parse_args() + + # Set the logging level. + if options.VERBOSE == 5: + tuf.log.set_log_level(logging.CRITICAL) + + elif options.VERBOSE == 4: + tuf.log.set_log_level(logging.ERROR) + + elif options.VERBOSE == 3: + tuf.log.set_log_level(logging.WARNING) + + elif options.VERBOSE == 2: + tuf.log.set_log_level(logging.INFO) + + elif options.VERBOSE == 1: + tuf.log.set_log_level(logging.DEBUG) + + else: + tuf.log.set_log_level(logging.NOTSET) + + # Ensure the repository path was set by the user. + if options.REPOSITORY_PATH is None: + parser.error('The repository path is unknown.') + + # Return a tuple containing the repository path, command, and command + # arguments needed by the repository tool. + return options.REPOSITORY_PATH, command, command_options + + + +if __name__ == '__main__': + + # Parse the options and set the logging level. + repository_path, command, command_arguments = parse_options() + + # Update the repository depending on the option specified on the command + # line. For example, + # tuf.repository_tool.generate_and_write_ed25519_keypair('./path/to/keystore/root') + # is called if the user invokes: + # $ tuf --gen-key root --keystore ./path/to/keystore --keytype ed25519 + + try: + update_repository(repository_path, command, command_arguments) + + except (tuf.Error) as e: + sys.stderr.write('Error: ' + str(e) + '\n') + sys.exit(1) + + # Successfully updated the local repository. + sys.exit(0) From e17550873f76086ef1044ed62fe3c0e2004ad1c6 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 25 Aug 2016 14:19:19 -0400 Subject: [PATCH 14/27] Create scripts directory. Store basic_client.py and tuf.py (CLI). --- tuf/{client => scripts}/basic_client.py | 0 tuf/{ => scripts}/tuf.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tuf/{client => scripts}/basic_client.py (100%) rename tuf/{ => scripts}/tuf.py (100%) diff --git a/tuf/client/basic_client.py b/tuf/scripts/basic_client.py similarity index 100% rename from tuf/client/basic_client.py rename to tuf/scripts/basic_client.py diff --git a/tuf/tuf.py b/tuf/scripts/tuf.py similarity index 100% rename from tuf/tuf.py rename to tuf/scripts/tuf.py From f9e22f0b367665e69443f55e5619cec20f28b5ba Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 25 Aug 2016 14:22:31 -0400 Subject: [PATCH 15/27] Add function to mark roles as dirty and update setup.py to list tuf.py --- setup.py | 3 ++- tuf/formats.py | 6 ++++++ tuf/roledb.py | 39 +++++++++++++++++++++++++++++++++++++++ tuf/scripts/tuf.py | 30 +++++++++++++++++++----------- 4 files changed, 66 insertions(+), 12 deletions(-) diff --git a/setup.py b/setup.py index c53879df..9a7001fc 100755 --- a/setup.py +++ b/setup.py @@ -112,6 +112,7 @@ packages = find_packages(exclude=['tests']), extras_require = extras, scripts = [ - 'tuf/client/basic_client.py' + 'tuf/scripts/basic_client.py' + 'tuf/scripts/tuf.py' ] ) diff --git a/tuf/formats.py b/tuf/formats.py index b81a909d..9c257bd9 100755 --- a/tuf/formats.py +++ b/tuf/formats.py @@ -450,6 +450,12 @@ key_schema = RELPATH_SCHEMA, value_schema = CUSTOM_SCHEMA) +# Command argument list, as used by the CLI tool. +# Example: {'keytype': ed25519, 'expires': 365,} +COMMAND_SCHEMA = SCHEMA.DictOf( + key_schema = NAME_SCHEMA, + value_schema = SCHEMA.Any()) + # tuf.roledb ROLEDB_SCHEMA = SCHEMA.Object( object_name = 'ROLEDB_SCHEMA', diff --git a/tuf/roledb.py b/tuf/roledb.py index 845cabd9..fe58ec5d 100755 --- a/tuf/roledb.py +++ b/tuf/roledb.py @@ -428,6 +428,45 @@ def get_dirty_roles(repository_name='default'): +def mark_dirty(roles, repository_name='default'): + """ + + Mark the list of 'roles' as dirty. + + + repository_name: + The name of the repository to get the dirty roles. If not supplied, the + 'default' repository is searched. + + roles: + A list of roles that should be marked as dirty. + + + tuf.FormatError, if the arguments are improperly formatted. + + tuf.InvalidNameError, if 'repository_name' does not exist in the role + database. + + + None. + + + None. + """ + + # Are the arguments properly formatted? If not, raise tuf.FormatError. + tuf.formats.NAMES_SCHEMA.check_match(roles) + tuf.formats.NAME_SCHEMA.check_match(repository_name) + + global _roledb_dict + global _dirty_roles + + if repository_name not in _roledb_dict or repository_name not in _dirty_roles: + raise tuf.InvalidNameError('Repository name does not' ' exist: ' + + repository_name) + + _dirty_roles[repository_name].update(roles) + def role_exists(rolename, repository_name='default'): diff --git a/tuf/scripts/tuf.py b/tuf/scripts/tuf.py index cb4bf6d2..f87de6f6 100755 --- a/tuf/scripts/tuf.py +++ b/tuf/scripts/tuf.py @@ -80,23 +80,17 @@ def update_repository(repository_path, command, command_arguments): """ - Perform an update of the metadata and target files located at - 'repository_mirror'. Target files are saved to the 'targets' directory - in the current working directory. The current directory must already - include a 'metadata' directory, which in turn must contain the 'current' - and 'previous' directories. At a minimum, these two directories require - the 'root.json' metadata file. + Update or create the repository found in 'repository_path'. What to + update is determined by the 'command,' which can correspond to one of the + supported repository tool functions. repository_path: - The URL to the repository mirror hosting the metadata and target - files. E.g., 'http://localhost:8001' command: command_arguments: - tuf.FormatError, if any of the arugments are improperly formatted. @@ -110,11 +104,25 @@ def update_repository(repository_path, command, command_arguments): # Do the arguments have the correct format? tuf.formats.URL_SCHEMA.check_match(repository_path) tuf.formats.NAME_SCHEMA.check_match(command) - tuf.formats.NAMES_SCHEMA.check_match(command_arguments) + tuf.formats.COMMAND_SCHEMA.check_match(command_arguments) # Set the local repository directory containing all of the metadata files. tuf.conf.repository_directory = repository_path + if command == 'init': + repository = create_new_repository(repository_path) + + # Import the root key(s). + try: + if command_arguments['keytype'] == 'ed25519': + repository.root.load_signing_key + + # Write the changes to the staged repository directory. + repository.write(consistent_snapshot=command_arguments['consistent_snapshot']) + + + elif command = 'gen-key': + command_arguments def parse_options(): @@ -141,7 +149,7 @@ def parse_options(): A tuple ('options.REPOSITORY_PATH', command, command_arguments). 'command' - 'command_arguments' corresponds to a repository tool fuction. + 'command_arguments' correspond to a repository tool fuction. """ From f6ec7fb578886b50aef463c3d04a4c699509d118 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 25 Aug 2016 14:34:36 -0400 Subject: [PATCH 16/27] Remove _delete_obsolete_metadata() from developer_tool.py and update MANIFEST.in to list scripts. --- MANIFEST.in | 1 + tuf/developer_tool.py | 5 ----- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/MANIFEST.in b/MANIFEST.in index f015228a..db697212 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -15,6 +15,7 @@ include tuf/_vendor/ed25519/LICENSE recursive-include docs *.txt recursive-include docs/papers *.pdf recursive-include docs/images *.png +recursive-include tuf/scripts *.py recursive-include examples * recursive-include tests *.py recursive-include tests *.pem diff --git a/tuf/developer_tool.py b/tuf/developer_tool.py index bf71e452..6db9f88d 100755 --- a/tuf/developer_tool.py +++ b/tuf/developer_tool.py @@ -287,11 +287,6 @@ def write(self, write_partial=False): self._targets_directory, self.keys, self._prefix, self.threshold, self.layout_type, self._project_name) - - # Delete the metadata of roles no longer in 'tuf.roledb'. Obsolete roles - # may have been revoked. - _delete_obsolete_metadata(self._metadata_directory, - project_signable['signed'], False) From 5f358e730014ffd2117efd62176657b1c3b0ec44 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Fri, 26 Aug 2016 16:38:59 -0400 Subject: [PATCH 17/27] Fix list of scripts in setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9a7001fc..f3af28a2 100755 --- a/setup.py +++ b/setup.py @@ -112,7 +112,7 @@ packages = find_packages(exclude=['tests']), extras_require = extras, scripts = [ - 'tuf/scripts/basic_client.py' + 'tuf/scripts/basic_client.py', 'tuf/scripts/tuf.py' ] ) From 6ec7197002f351ea1026c984935f7a18e350fb2c Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Fri, 26 Aug 2016 16:42:21 -0400 Subject: [PATCH 18/27] Fix issues with writting non-dirty'd consistent snapshots Improve coverage for _delete_obsolete_metadata() --- tests/test_repository_lib.py | 20 ++++++ tests/test_repository_tool.py | 8 ++- tuf/repository_lib.py | 118 +++++++++++++++------------------- tuf/repository_tool.py | 29 ++++++++- 4 files changed, 105 insertions(+), 70 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index c4d4abe7..29423693 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -920,6 +920,26 @@ def test__generate_and_write_metadata(self): + def test__delete_obsolete_metadata(self): + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + repository_directory = os.path.join(temporary_directory, 'repository') + metadata_directory = os.path.join(repository_directory, + repo_lib.METADATA_STAGED_DIRECTORY_NAME) + os.makedirs(metadata_directory) + snapshot_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'snapshot.json') + snapshot_signable = tuf.util.load_json_file(snapshot_filepath) + + # Create role metadata that should not exist in snapshot.json. + role1_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'role1.json') + shutil.copyfile(role1_filepath, os.path.join(metadata_directory, 'role2.json')) + + repo_lib._delete_obsolete_metadata(metadata_directory, + snapshot_signable['signed'], + True) + + def test__remove_invalid_and_duplicate_signatures(self): # Remove duplicate PSS signatures (same key generates valid, but different diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index d6e0136b..1690057b 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -334,8 +334,14 @@ def test_write_and_write_partial(self): repository.root.load_signing_key(root_privkey) repository.snapshot.load_signing_key(snapshot_privkey) - # Verify that a consistent snapshot can be written and loaded. + # Verify that a consistent snapshot can be written and loaded. The + # 'targets' and 'role1' roles must be be marked as dirty, otherwise + # write() will not create consistent snapshots for them. + repository.mark_dirty(['targets', 'role1']) repository.write(consistent_snapshot=True) + + # Verify that the newly written consistent snapshot can be loaded + # successfully. repo_tool.load_repository(repository_directory) # Test improperly formatted arguments. diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 2ba77f54..6080c788 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -99,7 +99,7 @@ SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz'] # The full list of supported TUF metadata extensions. -METADATA_EXTENSIONS = ['.json'] +METADATA_EXTENSIONS = ['.json', '.json.gz'] def _generate_and_write_metadata(rolename, metadata_filename, write_partial, @@ -209,7 +209,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, filename = write_metadata_file(signable, metadata_filename, metadata['version'], compression_algorithms, consistent_snapshot) - + # The root and timestamp files should also be written without a version # number prepended if 'consistent_snaptshot' is True. Clients may request # a timestamp and root file without knowing their version numbers. @@ -419,6 +419,7 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, 'repository_tool.py'. Revoked metadata files are not actually deleted until this function is called. Obsolete metadata should *not* be retained in "metadata.staged", otherwise they may be re-loaded by 'load_repository()'. + Note: Obsolete metadata may not always be easily detected (by inspecting top-level metadata during loading) due to partial metadata and top-level metadata that have not been written yet. @@ -428,8 +429,6 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, # is stored (including delegated roles). The 'django.json' role (e.g., # delegated by Targets) would be located in the # '{repository_directory}/metadata/' directory. - - # The 'targets.json' metadata is not visited, only delegated roles. if os.path.exists(metadata_directory) and os.path.isdir(metadata_directory): for directory_path, junk_directories, files in os.walk(metadata_directory): @@ -441,17 +440,19 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, # 'django.json' metadata_name = \ metadata_path[len(metadata_directory):].lstrip(os.path.sep) - + # Strip the version number if 'consistent_snapshot' is True. Example: - # '10.django.json' --> 'django.json'. Consistent and non-consistent + # '10.django.json' --> 'django.json'. Consistent and non-consistent # metadata might co-exist if write() and # write(consistent_snapshot=True) are mixed, so ensure only # '.filename' metadata is stripped. embedded_version_number = None - # Should we check if 'consistent_snapshot' is True? It might have - # been previously, so we'll proceed with the assumption that - # 'metadata_name' might have a prepended version number. + # Should we check if 'consistent_snapshot' is True? It might have been + # set previously, but 'consistent_snapshot' can potentially be False + # now. We'll proceed with the understanding that 'metadata_name' can + # have a prepended version number even though the repository is now + # a non-consistent one. if metadata_name not in snapshot_metadata['meta']: metadata_name, embedded_version_number = \ _strip_version_number(metadata_name, consistent_snapshot) @@ -466,11 +467,15 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, for metadata_extension in METADATA_EXTENSIONS: if metadata_name.endswith(metadata_extension): metadata_name = metadata_name[:-len(metadata_extension)] + break else: logger.debug(repr(metadata_name) + ' does not match' ' supported extension ' + repr(metadata_extension)) - + + if metadata_name in ['root', 'targets', 'snapshot', 'timestamp']: + return + # Delete the metadata file if it does not exist in 'tuf.roledb'. # 'repository_tool.py' might have removed 'metadata_name,' # but its metadata file is not actually deleted yet. Do it now. @@ -519,8 +524,12 @@ def _strip_version_number(metadata_filename, consistent_snapshot): dirname, basename = os.path.split(metadata_filename) version_number, basename = basename.split('.', 1) stripped_metadata_filename = os.path.join(dirname, basename) - - return stripped_metadata_filename, version_number + + if not version_number.isdigit(): + return metadata_filename, '' + + else: + return stripped_metadata_filename, version_number else: return metadata_filename, '' @@ -547,6 +556,7 @@ def _load_top_level_metadata(repository, top_level_filenames): # Load 'root.json'. A Root role file without a version number is always # written. if os.path.exists(root_filename): + # Initialize the key and role metadata of the top-level roles. signable = tuf.util.load_json_file(root_filename) tuf.formats.check_signable_object_format(signable) @@ -708,7 +718,7 @@ def _load_top_level_metadata(repository, top_level_filenames): tuf.roledb.add_role(rolename, roleinfo) else: - pass + pass return repository, consistent_snapshot @@ -1919,62 +1929,37 @@ def write_metadata_file(metadata, filename, version_number, # if re-saving is required. file_content = _get_written_metadata(metadata) - # Verify whether new metadata needs to be written (i.e., has not been - # previously written or has changed. - write_new_metadata = False + # We previously verified whether new metadata needed to be written (i.e., has + # not been previously written or has changed). It is now assumed that the + # caller intends to write changes that have been marked as dirty. - # Has the uncompressed metadata changed? Does it exist? If so, set - # 'write_compressed_version' to 'True' so that it is written. - # Compressed metadata should only be written if it does not exist or the - # uncompressed version has changed). - new_digests = {} - for hash_algorithm in tuf.conf.REPOSITORY_HASH_ALGORITHMS: - digest_object = tuf.hash.digest(hash_algorithm) - digest_object.update(file_content) - new_digests.update({hash_algorithm: digest_object.hexdigest()}) - - try: - file_length_junk, old_digests = \ - tuf.util.get_file_details(written_filename, - hash_algorithms=tuf.conf.REPOSITORY_HASH_ALGORITHMS) - if old_digests != new_digests: - write_new_metadata = True - - else: - logger.debug(repr(written_filename) + ' has not changed.') + # The 'metadata' object is written to 'file_object', including compressed + # versions. To avoid partial metadata from being written, 'metadata' is + # first written to a temporary location (i.e., 'file_object') and then + # moved to 'filename'. + file_object = tuf.util.TempFile() - # 'tuf.Error' raised if 'filename' does not exist. - except tuf.Error: - write_new_metadata = True - - if write_new_metadata: - # The 'metadata' object is written to 'file_object', including compressed - # versions. To avoid partial metadata from being written, 'metadata' is - # first written to a temporary location (i.e., 'file_object') and then - # moved to 'filename'. - file_object = tuf.util.TempFile() - - # Serialize 'metadata' to the file-like object and then write - # 'file_object' to disk. The dictionary keys of 'metadata' are sorted - # and indentation is used. The 'tuf.util.TempFile' file-like object is - # automically closed after the final move. - file_object.write(file_content) - logger.debug('Saving ' + repr(written_filename)) - - file_object.move(written_filename) - - if consistent_snapshot: - dirname, basename = os.path.split(written_filename) - - basename = basename.split(METADATA_EXTENSION, 1)[0] - version_and_filename = str(version_number) + '.' + basename + METADATA_EXTENSION - written_consistent_filename = os.path.join(dirname, version_and_filename) - - logger.info('Linking ' + repr(written_consistent_filename)) - os.link(written_filename, written_consistent_filename) + # Serialize 'metadata' to the file-like object and then write + # 'file_object' to disk. The dictionary keys of 'metadata' are sorted + # and indentation is used. The 'tuf.util.TempFile' file-like object is + # automically closed after the final move. + file_object.write(file_content) + logger.debug('Saving ' + repr(written_filename)) + file_object.move(written_filename) + + if consistent_snapshot: + dirname, basename = os.path.split(written_filename) + + basename = basename.split(METADATA_EXTENSION, 1)[0] + version_and_filename = str(version_number) + '.' + basename + METADATA_EXTENSION + written_consistent_filename = os.path.join(dirname, version_and_filename) + + logger.info('Linking ' + repr(written_consistent_filename)) + os.link(written_filename, written_consistent_filename) + else: - logger.debug('Not writing new metadata.') + logger.info('Not linking a consistent filename for: ' + repr(written_filename)) # Generate the compressed versions of 'metadata', if necessary. A compressed # file may be written (without needing to write the uncompressed version) if @@ -2014,7 +1999,8 @@ def write_metadata_file(metadata, filename, version_number, # unexpectedly change (gzip includes a timestamp) even though content has # not changed. _write_compressed_metadata(file_object, compressed_filename, - write_new_metadata, consistent_snapshot, version_number) + True, consistent_snapshot, + version_number) return written_filename diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index b8587d59..f6e60557 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -240,7 +240,6 @@ def write(self, write_partial=False, consistent_snapshot=False, 'timestamp': os.path.join(self._metadata_directory, repo_lib.TIMESTAMP_FILENAME)} snapshot_signable = None - dirty_rolenames = tuf.roledb.get_dirty_roles() for dirty_rolename in dirty_rolenames: @@ -449,7 +448,31 @@ def dirty_roles(self): """ logger.info('Dirty roles: ' + str(tuf.roledb.get_dirty_roles())) - + + + + def mark_dirty(self, roles): + """ + + Mark the list of 'roles' as dirty. + + + roles: + A list of roles to mark as dirty. on the next write, these roles + will be written to disk. + + + None. + + + None. + + + None. + """ + + tuf.roledb.mark_dirty(roles) + @staticmethod @@ -2835,7 +2858,7 @@ def load_repository(repository_directory): targets_objects = {} loaded_metadata = [] targets_objects['targets'] = repository.targets - + for metadata_role in os.listdir(metadata_directory): metadata_path = os.path.join(metadata_directory, metadata_role) From d4fc091da52f94238a587ff0b2613edd331c07f8 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 29 Aug 2016 11:42:19 -0400 Subject: [PATCH 19/27] Add test case for newly-added tuf.roledb.mark_dirty() --- tests/test_roledb.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/test_roledb.py b/tests/test_roledb.py index 06a2aa17..6149dd5d 100755 --- a/tests/test_roledb.py +++ b/tests/test_roledb.py @@ -677,7 +677,30 @@ def test_get_dirty_roles(self): # Test for improperly formatted argument. self.assertRaises(tuf.FormatError, tuf.roledb.get_dirty_roles, 123) + + + + def test_mark_dirty(self): + # Add a dirty role to roledb. + rolename = 'targets' + roleinfo1 = {'keyids': ['123'], 'threshold': 1} + tuf.roledb.add_role(rolename, roleinfo1) + rolename2 = 'dirty_role' + roleinfo2 = {'keyids': ['123'], 'threshold': 2} + mark_role_as_dirty = True + tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty) + # Note: The 'default' repository is searched if the repository name is + # not given to get_dirty_roles(). + self.assertEqual([rolename], tuf.roledb.get_dirty_roles()) + tuf.roledb.mark_dirty(['dirty_role']) + self.assertEqual([rolename2, rolename], sorted(tuf.roledb.get_dirty_roles())) + + # Verify that a role cannot be marked as dirty for a non-existent + # repository. + self.assertRaises(tuf.InvalidNameError, tuf.roledb.mark_dirty, + ['dirty_role'], 'non-existent') + def _test_rolename(self, test_function): From c369329d9302d8430f4f129f25fe465632369ad3 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Tue, 30 Aug 2016 10:56:07 -0400 Subject: [PATCH 20/27] Log debug messages for missing else statements and cover branch case in _delete_obsolete_metadata() --- tests/test_repository_lib.py | 6 ++++++ tuf/repository_lib.py | 32 ++++++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 29423693..a6b6a2dd 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -939,6 +939,12 @@ def test__delete_obsolete_metadata(self): snapshot_signable['signed'], True) + # Verify what happens for a non-existent metadata directory (a debug message + # is logged). + repo_lib._delete_obsolete_metadata('non-existent', + snapshot_signable['signed'], + True) + def test__remove_invalid_and_duplicate_signatures(self): diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 6080c788..9ccaa288 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -99,7 +99,7 @@ SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz'] # The full list of supported TUF metadata extensions. -METADATA_EXTENSIONS = ['.json', '.json.gz'] +METADATA_EXTENSIONS = ['.json.gz', '.json'] def _generate_and_write_metadata(rolename, metadata_filename, write_partial, @@ -488,7 +488,9 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, # TODO: Should we delete outdated consistent snapshots, or does it make # more sense for integrators to remove outdated consistent snapshots? - + + else: + logger.debug('Metadata directory does not exist: ' + repr(metadata_directory)) @@ -570,9 +572,16 @@ def _load_top_level_metadata(repository, top_level_filenames): for signature in signable['signatures']: if signature not in roleinfo['signatures']: roleinfo['signatures'].append(signature) + + else: + logger.debug('Found a root signature that is already loaded:' + ' ' + repr(signature)) if os.path.exists(root_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('A compressed Root file was not found.') # By default, roleinfo['partial_loaded'] of top-level roles should be set # to False in 'create_roledb_from_root_metadata()'. Update this field, if @@ -580,6 +589,9 @@ def _load_top_level_metadata(repository, top_level_filenames): if _metadata_is_partially_loaded('root', signable, roleinfo): roleinfo['partial_loaded'] = True + else: + logger.debug('Root was not partially loaded.') + _log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'], ROOT_EXPIRES_WARN_SECONDS) @@ -606,9 +618,15 @@ def _load_top_level_metadata(repository, top_level_filenames): roleinfo['version'] = timestamp_metadata['version'] if os.path.exists(timestamp_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('A compressed Timestamp file was not found.') if _metadata_is_partially_loaded('timestamp', signable, roleinfo): roleinfo['partial_loaded'] = True + + else: + logger.debug('The Timestamp role was not partially loaded.') _log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'], TIMESTAMP_EXPIRES_WARN_SECONDS) @@ -616,7 +634,7 @@ def _load_top_level_metadata(repository, top_level_filenames): tuf.roledb.update_roleinfo('timestamp', roleinfo, mark_role_as_dirty=False) else: - pass + logger.debug('Cannot load the Timestamp file: ' + repr(timestamp_filename)) # Load 'snapshot.json'. A consistent snapshot.json must be calculated if # 'consistent_snapshot' is True. @@ -644,9 +662,15 @@ def _load_top_level_metadata(repository, top_level_filenames): roleinfo['version'] = snapshot_metadata['version'] if os.path.exists(snapshot_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('A compressed Snapshot file was not loaded.') if _metadata_is_partially_loaded('snapshot', signable, roleinfo): roleinfo['partial_loaded'] = True + + else: + logger.debug('Snapshot was not partially loaded.') _log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'], SNAPSHOT_EXPIRES_WARN_SECONDS) @@ -654,7 +678,7 @@ def _load_top_level_metadata(repository, top_level_filenames): tuf.roledb.update_roleinfo('snapshot', roleinfo, mark_role_as_dirty=False) else: - pass + logger.debug('The Snapshot file cannot be loaded: ' + repr(snapshot_filename)) # Load 'targets.json'. A consistent snapshot of the Targets role must be # calculated if 'consistent_snapshot' is True. From 5e2d177f4dd213a13ee3f27f01f0f782cf544afa Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Tue, 30 Aug 2016 17:09:03 -0400 Subject: [PATCH 21/27] Add test case for _load_top_level_metadata() and debug statements to missing else clauses --- tests/test_repository_lib.py | 55 ++++++++++++++++++++++++++++++++++++ tuf/repository_lib.py | 16 +++++++---- tuf/repository_tool.py | 2 +- tuf/roledb.py | 2 ++ 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index a6b6a2dd..4d798248 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -946,6 +946,61 @@ def test__delete_obsolete_metadata(self): True) + def test__load_top_level_metadata(self): + tuf.roledb.clear_roledb(clear_all=True) + tuf.keydb.clear_keydb(clear_all=True) + + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + repository_directory = os.path.join(temporary_directory, 'repository') + metadata_directory = os.path.join(repository_directory, + repo_lib.METADATA_STAGED_DIRECTORY_NAME) + targets_directory = os.path.join(repository_directory, + repo_lib.TARGETS_DIRECTORY_NAME) + shutil.copytree(os.path.join('repository_data', 'repository', 'metadata'), + metadata_directory) + shutil.copytree(os.path.join('repository_data', 'repository', 'targets'), + targets_directory) + + # Remove compressed metadata so that we can test for loading of a + # repository with no compression enabled. + for role_file in os.listdir(metadata_directory): + if role_file.endswith('.json.gz'): + role_filename = os.path.join(metadata_directory, role_file) + os.remove(role_filename) + + filenames = repo_lib.get_metadata_filenames(metadata_directory) + repository = repo_tool.create_new_repository(repository_directory) + repo_lib._load_top_level_metadata(repository, filenames) + + # We partially loaded 'role1' via the top-level Targets role. For the + # purposes of this test case (which only loads top-level metadata and no + # delegated metadata), remove this role to avoid issues with partially + # loaded information (e.g., missing 'version' info, signatures, etc.) + tuf.roledb.remove_role('role1') + + # Partially write all top-level roles (we increase the threshold of each + # top-level role so that they are flagged as partially written. + repository.root.threshold = repository.root.threshold + 1 + repository.snapshot.threshold = repository.snapshot.threshold + 1 + repository.targets.threshold = repository.targets.threshold + 1 + repository.timestamp.threshold = repository.timestamp.threshold + 1 + repository.write(write_partial=True) + + repo_lib._load_top_level_metadata(repository, filenames) + + # Attempt to load a repository with missing top-level metadata. + for role_file in os.listdir(metadata_directory): + if role_file.endswith('.json') and not role_file.startswith('root'): + role_filename = os.path.join(metadata_directory, role_file) + os.remove(role_filename) + repo_lib._load_top_level_metadata(repository, filenames) + + # Remove the required Root file and verify that an exception is raised. + os.remove(os.path.join(metadata_directory, 'root.json')) + self.assertRaises(tuf.RepositoryError, repo_lib._load_top_level_metadata, + repository, filenames) + + def test__remove_invalid_and_duplicate_signatures(self): # Remove duplicate PSS signatures (same key generates valid, but different diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 9ccaa288..ae3fa760 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -542,7 +542,7 @@ def _strip_version_number(metadata_filename, consistent_snapshot): def _load_top_level_metadata(repository, top_level_filenames): """ Load the metadata of the Root, Timestamp, Targets, and Snapshot roles. At a - minimum, the Root role must exist and successfully load. + minimum, the Root role must exist and load successfully. """ root_filename = top_level_filenames[ROOT_FILENAME] @@ -574,7 +574,7 @@ def _load_top_level_metadata(repository, top_level_filenames): roleinfo['signatures'].append(signature) else: - logger.debug('Found a root signature that is already loaded:' + logger.debug('Found a Root signature that is already loaded:' ' ' + repr(signature)) if os.path.exists(root_filename + '.gz'): @@ -601,8 +601,8 @@ def _load_top_level_metadata(repository, top_level_filenames): consistent_snapshot = root_metadata['consistent_snapshot'] else: - message = 'Cannot load the required root file: ' + repr(root_filename) - raise tuf.RepositoryError(message) + raise tuf.RepositoryError('Cannot load the required root file:' + ' ' + repr(root_filename)) # Load 'timestamp.json'. A Timestamp role file without a version number is # always written. @@ -704,9 +704,15 @@ def _load_top_level_metadata(repository, top_level_filenames): roleinfo['delegations'] = targets_metadata['delegations'] if os.path.exists(targets_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('Compressed Targets file cannot be loaded.') if _metadata_is_partially_loaded('targets', signable, roleinfo): roleinfo['partial_loaded'] = True + + else: + logger.debug('Targets file was not partially loaded.') _log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'], TARGETS_EXPIRES_WARN_SECONDS) @@ -742,7 +748,7 @@ def _load_top_level_metadata(repository, top_level_filenames): tuf.roledb.add_role(rolename, roleinfo) else: - pass + logger.debug('The Targets file cannot be loaded: ' + repr(targets_filename)) return repository, consistent_snapshot diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index f6e60557..a1d1682a 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -243,7 +243,7 @@ def write(self, write_partial=False, consistent_snapshot=False, dirty_rolenames = tuf.roledb.get_dirty_roles() for dirty_rolename in dirty_rolenames: - + # Ignore top-level roles, they will be generated later in this method. if dirty_rolename in ['root', 'targets', 'snapshot', 'timestamp']: continue diff --git a/tuf/roledb.py b/tuf/roledb.py index fe58ec5d..d0bfb4a1 100755 --- a/tuf/roledb.py +++ b/tuf/roledb.py @@ -911,6 +911,8 @@ def clear_roledb(repository_name='default', clear_all=False): if clear_all: _roledb_dict = {} _roledb_dict['default'] = {} + _dirty_roles = {} + _dirty_roles['default'] = set() return _roledb_dict[repository_name] = {} From a3c482691c707ce2825fff8dbe5b4343d969304c Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Wed, 31 Aug 2016 13:38:00 -0400 Subject: [PATCH 22/27] Add code coverage for pyca_cryptography via keys.py --- tests/test_keys.py | 77 +++++++++++++++++++++---------------- tests/test_pycrypto_keys.py | 2 +- tuf/pycrypto_keys.py | 4 +- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/tests/test_keys.py b/tests/test_keys.py index f17763b2..0c116bbd 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -51,25 +51,31 @@ def setUpClass(cls): def test_generate_rsa_key(self): - _rsakey_dict = KEYS.generate_rsa_key() + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library + + _rsakey_dict = KEYS.generate_rsa_key() - # Check if the format of the object returned by generate() corresponds - # to RSAKEY_SCHEMA format. - self.assertEqual(None, tuf.formats.RSAKEY_SCHEMA.check_match(_rsakey_dict), - FORMAT_ERROR_MSG) + # Check if the format of the object returned by generate() corresponds + # to RSAKEY_SCHEMA format. + self.assertEqual(None, tuf.formats.RSAKEY_SCHEMA.check_match(_rsakey_dict), + FORMAT_ERROR_MSG) - # Passing a bit value that is <2048 to generate() - should raise - # 'tuf.FormatError'. - self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 555) + # Passing a bit value that is <2048 to generate() - should raise + # 'tuf.FormatError'. + self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 555) - # Passing a string instead of integer for a bit value. - self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 'bits') + # Passing a string instead of integer for a bit value. + self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 'bits') - # NOTE if random bit value >=2048 (not 4096) is passed generate(bits) - # does not raise any errors and returns a valid key. - self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(2048))) - self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096))) + # NOTE if random bit value >=2048 (not 4096) is passed generate(bits) + # does not raise any errors and returns a valid key. + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(2048))) + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096))) + # Reset to originally set RSA crypto library. + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library def test_format_keyval_to_metadata(self): @@ -176,29 +182,34 @@ def test_helper_get_keyid(self): def test_create_signature(self): - # Creating a signature for 'DATA'. - rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) - ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library - # Check format of output. - self.assertEqual(None, - tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature), - FORMAT_ERROR_MSG) - self.assertEqual(None, - tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature), - FORMAT_ERROR_MSG) + # Creating a signature for 'DATA'. + rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) + ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) + + # Check format of output. + self.assertEqual(None, + tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature), + FORMAT_ERROR_MSG) + self.assertEqual(None, + tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature), + FORMAT_ERROR_MSG) - # Removing private key from 'rsakey_dict' - should raise a TypeError. - private = self.rsakey_dict['keyval']['private'] - self.rsakey_dict['keyval']['private'] = '' - - args = (self.rsakey_dict, DATA) - self.assertRaises(TypeError, KEYS.create_signature, *args) + # Removing private key from 'rsakey_dict' - should raise a TypeError. + private = self.rsakey_dict['keyval']['private'] + self.rsakey_dict['keyval']['private'] = '' + + args = (self.rsakey_dict, DATA) + self.assertRaises(ValueError, KEYS.create_signature, *args) - # Supplying an incorrect number of arguments. - self.assertRaises(TypeError, KEYS.create_signature) - self.rsakey_dict['keyval']['private'] = private + # Supplying an incorrect number of arguments. + self.assertRaises(TypeError, KEYS.create_signature) + self.rsakey_dict['keyval']['private'] = private + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library def test_verify_signature(self): diff --git a/tests/test_pycrypto_keys.py b/tests/test_pycrypto_keys.py index 8d1f6c36..9c653105 100755 --- a/tests/test_pycrypto_keys.py +++ b/tests/test_pycrypto_keys.py @@ -81,7 +81,7 @@ def test_create_rsa_signature(self): self.assertRaises(tuf.FormatError, pycrypto.create_rsa_signature, 123, data) - self.assertRaises(TypeError, + self.assertRaises(ValueError, pycrypto.create_rsa_signature, '', data) # Check for invalid 'data'. diff --git a/tuf/pycrypto_keys.py b/tuf/pycrypto_keys.py index d4d953cb..45081dc8 100755 --- a/tuf/pycrypto_keys.py +++ b/tuf/pycrypto_keys.py @@ -253,7 +253,7 @@ def create_rsa_signature(private_key, data): tuf.FormatError, if 'private_key' is improperly formatted. - TypeError, if 'private_key' is unset. + ValueError, if 'private_key' is unset. tuf.CryptoError, if the signature cannot be generated. @@ -316,7 +316,7 @@ def create_rsa_signature(private_key, data): raise tuf.CryptoError('An RSA signature cannot be generated: ' + str(e)) else: - raise TypeError('The required private key is unset.') + raise ValueError('The required private key is unset.') return signature, method From 37dcfba99945224c9a867f2398f5f41e47263379 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Wed, 31 Aug 2016 15:20:02 -0400 Subject: [PATCH 23/27] Improve test coverage for keys.py and pyca_cryptography.py --- tests/test_keys.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/tests/test_keys.py b/tests/test_keys.py index 0c116bbd..ab9ec4fa 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -283,26 +283,31 @@ def test_create_rsa_encrypted_pem(self): def test_decrypt_key(self): - # Test valid arguments. - passphrase = 'secret' - encrypted_key = KEYS.encrypt_key(self.rsakey_dict, passphrase).encode('utf-8') - decrypted_key = KEYS.decrypt_key(encrypted_key, passphrase) - - self.assertTrue(tuf.formats.ANYKEY_SCHEMA.matches(decrypted_key)) + default_general_library = KEYS._GENERAL_CRYPTO_LIBRARY + for general_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._GENERAL_CRYPTO_LIBRARY = general_crypto_library - # Test improperly formatted arguments. - self.assertRaises(tuf.FormatError, KEYS.decrypt_key, - 8, passphrase) - - self.assertRaises(tuf.FormatError, KEYS.decrypt_key, - encrypted_key, 8) + # Test valid arguments. + passphrase = 'secret' + encrypted_key = KEYS.encrypt_key(self.rsakey_dict, passphrase).encode('utf-8') + decrypted_key = KEYS.decrypt_key(encrypted_key, passphrase) - # Test for missing required library. - KEYS._GENERAL_CRYPTO_LIBRARY = 'invalid' - self.assertRaises(tuf.UnsupportedLibraryError, KEYS.decrypt_key, - encrypted_key, passphrase) - KEYS._GENERAL_CRYPTO_LIBRARY = 'pycrypto' + self.assertTrue(tuf.formats.ANYKEY_SCHEMA.matches(decrypted_key)) + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, KEYS.decrypt_key, + 8, passphrase) + + self.assertRaises(tuf.FormatError, KEYS.decrypt_key, + encrypted_key, 8) + # Test for missing required library. + KEYS._GENERAL_CRYPTO_LIBRARY = 'invalid' + self.assertRaises(tuf.UnsupportedLibraryError, KEYS.decrypt_key, + encrypted_key, passphrase) + KEYS._GENERAL_CRYPTO_LIBRARY = 'pycrypto' + + KEYS._GENERAL_CRYPTO_LIBRARY = default_general_library # Run the unit tests. From 2a0365d332e9a4968186b5cc38f4b668ae450043 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Wed, 31 Aug 2016 16:09:00 -0400 Subject: [PATCH 24/27] Add test cases for _decrypt() and encrypt_key() --- tests/test_pyca_crypto_keys.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_pyca_crypto_keys.py b/tests/test_pyca_crypto_keys.py index 52e67034..790e0213 100755 --- a/tests/test_pyca_crypto_keys.py +++ b/tests/test_pyca_crypto_keys.py @@ -137,6 +137,21 @@ def test_verify_rsa_signature(self): self.assertEqual(False, crypto_keys.verify_rsa_signature(mismatched_signature, method, public_rsa, data)) + def test__decrypt(self): + # Verify that invalid encrypted file is detected. + self.assertRaises(tuf.CryptoError, crypto_keys._decrypt, + 'bad encrypted file', 'password') + + + + def test_encrypt_key(self): + # Verify that a key argument with a missing private key is rejected. + global public_rsa + + self.assertRaises(tuf.FormatError, crypto_keys.encrypt_key, + public_rsa, 'password') + + # Run the unit tests. if __name__ == '__main__': From 98c386083a67736162b83297ecaab9462935889e Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Wed, 31 Aug 2016 16:11:07 -0400 Subject: [PATCH 25/27] Improve code coverage for pyca_crypto_keys.py (functions for encrypted PEMs) --- tests/test_keys.py | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/tests/test_keys.py b/tests/test_keys.py index ab9ec4fa..89ced710 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -261,26 +261,36 @@ def test_verify_signature(self): def test_create_rsa_encrypted_pem(self): - # Test valid arguments. - private = self.rsakey_dict['keyval']['private'] - passphrase = 'secret' - encrypted_pem = KEYS.create_rsa_encrypted_pem(private, passphrase) - self.assertTrue(tuf.formats.PEMRSA_SCHEMA.matches(encrypted_pem)) - - # Test improperly formatted arguments. - self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, - 8, passphrase) + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library - self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, - private, 8) + # Test valid arguments. + private = self.rsakey_dict['keyval']['private'] + passphrase = 'secret' + encrypted_pem = KEYS.create_rsa_encrypted_pem(private, passphrase) + self.assertTrue(tuf.formats.PEMRSA_SCHEMA.matches(encrypted_pem)) - # Test for missing required library. - KEYS._RSA_CRYPTO_LIBRARY = 'invalid' - self.assertRaises(tuf.UnsupportedLibraryError, KEYS.create_rsa_encrypted_pem, - private, passphrase) - KEYS._RSA_CRYPTO_LIBRARY = 'pycrypto' - + # Try to import the encryped PEM file. + rsakey = KEYS.import_rsakey_from_encrypted_pem(encrypted_pem, passphrase) + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(rsakey)) + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, + 8, passphrase) + + self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, + private, 8) + + # Test for missing required library. + KEYS._RSA_CRYPTO_LIBRARY = 'invalid' + self.assertRaises(tuf.UnsupportedLibraryError, KEYS.create_rsa_encrypted_pem, + private, passphrase) + KEYS._RSA_CRYPTO_LIBRARY = 'pycrypto' + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library + + def test_decrypt_key(self): default_general_library = KEYS._GENERAL_CRYPTO_LIBRARY From 5a35e797fc783585be3423fa0f24c5e40d43ad79 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 1 Sep 2016 10:12:56 -0400 Subject: [PATCH 26/27] Improve code coverage for test_signature() --- tests/test_keys.py | 81 +++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/tests/test_keys.py b/tests/test_keys.py index 89ced710..a59f79f6 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -213,51 +213,58 @@ def test_create_signature(self): def test_verify_signature(self): - # Creating a signature of 'DATA' to be verified. - rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) - ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) - - # Verifying the 'signature' of 'DATA'. - verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA) - self.assertTrue(verified, "Incorrect signature.") + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + default_available_libraries = KEYS._available_crypto_libraries + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library - # Verifying the 'ed25519_signature' of 'DATA'. - verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) - self.assertTrue(verified, "Incorrect signature.") + # Creating a signature of 'DATA' to be verified. + rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) + ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) - # Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with - # 'DATA' different than the original 'DATA' that was used - # in creating the 'rsa_signature'. Function should return 'False'. + # Verifying the 'signature' of 'DATA'. + verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Verifying the 'ed25519_signature' of 'DATA'. + verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with + # 'DATA' different than the original 'DATA' that was used + # in creating the 'rsa_signature'. Function should return 'False'. + + # Modifying 'DATA'. + _DATA = '1111' + DATA + '1111' - # Modifying 'DATA'. - _DATA = '1111' + DATA + '1111' - - # Verifying the 'signature' of modified '_DATA'. - verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA) - self.assertFalse(verified, - 'Returned \'True\' on an incorrect signature.') + # Verifying the 'signature' of modified '_DATA'. + verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA) + self.assertFalse(verified, + 'Returned \'True\' on an incorrect signature.') - # Modifying 'signature' to pass an incorrect method since only - # 'PyCrypto-PKCS#1 PSS' is accepted. - rsa_signature['method'] = 'Biff' + # Modifying 'signature' to pass an incorrect method since only + # 'PyCrypto-PKCS#1 PSS' is accepted. + rsa_signature['method'] = 'Biff' - args = (self.rsakey_dict, rsa_signature, DATA) - self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args) + args = (self.rsakey_dict, rsa_signature, DATA) + self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args) - # Passing incorrect number of arguments. - self.assertRaises(TypeError, KEYS.verify_signature) - - # Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is - # unavailable) is executed in tuf.keys.verify_signature(). - KEYS._ED25519_CRYPTO_LIBRARY = 'invalid' - KEYS._available_crypto_libraries = ['invalid'] - verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) - self.assertTrue(verified, "Incorrect signature.") + # Passing incorrect number of arguments. + self.assertRaises(TypeError, KEYS.verify_signature) - # Reset to the expected available crypto libraries. - KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl' - KEYS._available_crypto_libraries = ['ed25519', 'pycrypto', 'pynacl'] + # Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is + # unavailable) is executed in tuf.keys.verify_signature(). + KEYS._ED25519_CRYPTO_LIBRARY = 'invalid' + KEYS._available_crypto_libraries = ['invalid'] + verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Reset to the expected available crypto libraries. + KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl' + KEYS._available_crypto_libraries = default_available_libraries + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library + def test_create_rsa_encrypted_pem(self): From e0d5dd0ee147ae83d88c46ad13c4a675ac12b9a7 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 1 Sep 2016 10:15:55 -0400 Subject: [PATCH 27/27] Minor edit to length of commented line --- tuf/keys.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tuf/keys.py b/tuf/keys.py index 5afc5d6d..5d4c3ab4 100755 --- a/tuf/keys.py +++ b/tuf/keys.py @@ -1444,8 +1444,8 @@ def create_rsa_encrypted_pem(private_key, passphrase): # Does 'passphrase' have the correct format? tuf.formats.PASSWORD_SCHEMA.check_match(passphrase) - # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in - # 'tuf.conf', are unsupported or unavailable: + # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified + # in 'tuf.conf', are unsupported or unavailable: # 'tuf.conf.GENERAL_CRYPTO_LIBRARY' and 'tuf.conf.RSA_CRYPTO_LIBRARY'. check_crypto_libraries(['rsa', 'general'])