From 4cb851ca0a13683ee12bf712db34c0f80a9832bb Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Tue, 27 Oct 2015 16:11:11 -0400 Subject: [PATCH] Fix remaining issues with unit tests after implementing version numbers in snapshot.json --- tests/test_endless_data_attack.py | 2 +- tests/test_extraneous_dependencies_attack.py | 2 +- tests/test_formats.py | 66 +++---- tests/test_keydb.py | 8 +- tests/test_mix_and_match_attack.py | 2 +- tests/test_replay_attack.py | 3 +- tests/test_repository_lib.py | 17 +- tests/test_roledb.py | 12 +- tests/test_updater.py | 170 ++++++++++--------- tuf/client/updater.py | 109 ++++++++---- tuf/developer_tool.py | 3 +- tuf/repository_lib.py | 25 ++- tuf/repository_tool.py | 22 +-- 13 files changed, 253 insertions(+), 188 deletions(-) diff --git a/tests/test_endless_data_attack.py b/tests/test_endless_data_attack.py index 0c842016..afc12e27 100755 --- a/tests/test_endless_data_attack.py +++ b/tests/test_endless_data_attack.py @@ -282,7 +282,7 @@ def test_with_tuf(self): except tuf.NoWorkingMirrorError as exception: for mirror_url, mirror_error in six.iteritems(exception.mirror_errors): - self.assertTrue(isinstance(mirror_error, tuf.InvalidMetadataJSONError)) + self.assertTrue(isinstance(mirror_error, tuf.Error)) else: self.fail('TUF did not prevent an endless data attack.') diff --git a/tests/test_extraneous_dependencies_attack.py b/tests/test_extraneous_dependencies_attack.py index 2265ea70..2ff98971 100755 --- a/tests/test_extraneous_dependencies_attack.py +++ b/tests/test_extraneous_dependencies_attack.py @@ -223,7 +223,7 @@ def test_with_tuf(self): # Verify that 'role1.json' is the culprit. self.assertEqual(url_file, mirror_url) - self.assertTrue(isinstance(mirror_error, tuf.BadHashError)) + self.assertTrue(isinstance(mirror_error, tuf.ForbiddenTargetError)) else: self.fail('TUF did not prevent an extraneous dependencies attack.') diff --git a/tests/test_formats.py b/tests/test_formats.py index 1cc25c1c..162ed9c3 100755 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -197,6 +197,7 @@ def test_schemas(self): {'_type': 'Root', 'version': 8, 'consistent_snapshot': False, + 'compression_algorithms': ['gz'], 'expires': '1985-10-21T13:20:00Z', 'keys': {'123abc': {'keytype': 'rsa', 'keyval': {'public': 'pubkey', @@ -223,17 +224,13 @@ def test_schemas(self): {'_type': 'Snapshot', 'version': 8, 'expires': '1985-10-21T13:20:00Z', - 'meta': {'metadata/snapshot.json': {'length': 1024, - 'hashes': {'sha256': 'ABCD123'}, - 'custom': {'type': 'metadata'}}}}), + 'meta': {'metadata/snapshot.json': {'version': 1024}}}), 'TIMESTAMP_SCHEMA': (tuf.formats.TIMESTAMP_SCHEMA, {'_type': 'Timestamp', 'version': 8, 'expires': '1985-10-21T13:20:00Z', - 'meta': {'metadata/timestamp.json': {'length': 1024, - 'hashes': {'sha256': 'ABCD123'}, - 'custom': {'type': 'metadata'}}}}), + 'meta': {'metadata/timestamp.json': {'version': 1024}}}), 'MIRROR_SCHEMA': (tuf.formats.MIRROR_SCHEMA, {'url_prefix': 'http://localhost:8001', @@ -303,29 +300,27 @@ def test_TimestampFile(self): # Test conditions for valid instances of 'tuf.formats.TimestampFile'. version = 8 expires = '1985-10-21T13:20:00Z' - filedict = {'metadata/timestamp.json': {'length': 1024, - 'hashes': {'sha256': 'ABCD123'}, - 'custom': {'type': 'metadata'}}} + versiondict = {'targets.json': {'version': version}} make_metadata = tuf.formats.TimestampFile.make_metadata from_metadata = tuf.formats.TimestampFile.from_metadata TIMESTAMP_SCHEMA = tuf.formats.TIMESTAMP_SCHEMA self.assertTrue(TIMESTAMP_SCHEMA.matches(make_metadata(version, expires, - filedict))) - metadata = make_metadata(version, expires, filedict) + versiondict))) + metadata = make_metadata(version, expires, versiondict) self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TimestampFile)) # Test conditions for invalid arguments. bad_version = 'eight' bad_expires = '2000' - bad_filedict = 123 + bad_versiondict = 123 self.assertRaises(tuf.FormatError, make_metadata, bad_version, - expires, filedict) + expires, versiondict) self.assertRaises(tuf.FormatError, make_metadata, version, - bad_expires, filedict) + bad_expires, versiondict) self.assertRaises(tuf.FormatError, make_metadata, version, - expires, bad_filedict) + expires, bad_versiondict) self.assertRaises(tuf.FormatError, from_metadata, 123) @@ -345,6 +340,8 @@ def test_RootFile(self): roledict = {'root': {'keyids': ['123abc'], 'threshold': 1, 'paths': ['path1/', 'path2']}} + + compression_algorithms = ['gz'] make_metadata = tuf.formats.RootFile.make_metadata from_metadata = tuf.formats.RootFile.from_metadata @@ -352,9 +349,10 @@ def test_RootFile(self): self.assertTrue(ROOT_SCHEMA.matches(make_metadata(version, expires, keydict, roledict, - consistent_snapshot))) + consistent_snapshot, + compression_algorithms))) metadata = make_metadata(version, expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, compression_algorithms) self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.RootFile)) # Test conditions for invalid arguments. @@ -362,23 +360,28 @@ def test_RootFile(self): bad_expires = 'eight' bad_keydict = 123 bad_roledict = 123 + bad_compression_algorithms = 'nozip' self.assertRaises(tuf.FormatError, make_metadata, bad_version, expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertRaises(tuf.FormatError, make_metadata, version, bad_expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertRaises(tuf.FormatError, make_metadata, version, expires, bad_keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertRaises(tuf.FormatError, make_metadata, version, expires, keydict, bad_roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertRaises(tuf.FormatError, from_metadata, 'bad') @@ -388,30 +391,27 @@ def test_SnapshotFile(self): # Test conditions for valid instances of 'tuf.formats.SnapshotFile'. version = 8 expires = '1985-10-21T13:20:00Z' - - filedict = {'metadata/snapshot.json': {'length': 1024, - 'hashes': {'sha256': 'ABCD123'}, - 'custom': {'type': 'metadata'}}} - + versiondict = {'targets.json' : {'version': version}} + make_metadata = tuf.formats.SnapshotFile.make_metadata from_metadata = tuf.formats.SnapshotFile.from_metadata SNAPSHOT_SCHEMA = tuf.formats.SNAPSHOT_SCHEMA self.assertTrue(SNAPSHOT_SCHEMA.matches(make_metadata(version, expires, - filedict))) - metadata = make_metadata(version, expires, filedict) + versiondict))) + metadata = make_metadata(version, expires, versiondict) self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.SnapshotFile)) # Test conditions for invalid arguments. bad_version = '8' bad_expires = '2000' - bad_filedict = 123 + bad_versiondict = 123 self.assertRaises(tuf.FormatError, make_metadata, version, - expires, bad_filedict) + expires, bad_versiondict) self.assertRaises(tuf.FormatError, make_metadata, bad_version, expires, - filedict) + versiondict) self.assertRaises(tuf.FormatError, make_metadata, version, bad_expires, - bad_filedict) + bad_versiondict) self.assertRaises(tuf.FormatError, from_metadata, 123) @@ -548,6 +548,7 @@ def test_make_signable(self): root = {'_type': 'Root', 'version': 8, 'consistent_snapshot': False, + 'compression_algorithms': ['gz'], 'expires': '1985-10-21T13:20:00Z', 'keys': {'123abc': {'keytype': 'rsa', 'keyval': {'public': 'pubkey', @@ -679,6 +680,7 @@ def test_check_signable_object_format(self): root = {'_type': 'Root', 'version': 8, 'consistent_snapshot': False, + 'compression_algorithms': ['gz'], 'expires': '1985-10-21T13:20:00Z', 'keys': {'123abc': {'keytype': 'rsa', 'keyval': {'public': 'pubkey', diff --git a/tests/test_keydb.py b/tests/test_keydb.py index 394e66a6..d80b0257 100755 --- a/tests/test_keydb.py +++ b/tests/test_keydb.py @@ -190,11 +190,13 @@ def test_create_keydb_from_root_metadata(self): version = 8 consistent_snapshot = False expires = '1985-10-21T01:21:00Z' + compression_algorithms = ['gz'] root_metadata = tuf.formats.RootFile.make_metadata(version, expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata)) tuf.keydb.create_keydb_from_root_metadata(root_metadata) @@ -230,11 +232,13 @@ def test_create_keydb_from_root_metadata(self): keydict[keyid3] = rsakey3 version = 8 expires = '1985-10-21T01:21:00Z' + compression_algorithms = ['gz'] root_metadata = tuf.formats.RootFile.make_metadata(version, expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata)) # Ensure only 'keyid2' was added to the keydb database. 'keyid' and diff --git a/tests/test_mix_and_match_attack.py b/tests/test_mix_and_match_attack.py index c2de9838..01d82e3c 100755 --- a/tests/test_mix_and_match_attack.py +++ b/tests/test_mix_and_match_attack.py @@ -248,7 +248,7 @@ def test_with_tuf(self): # Verify that 'timestamp.json' is the culprit. self.assertEqual(url_file, mirror_url) - self.assertTrue(isinstance(mirror_error, tuf.BadHashError)) + self.assertTrue(isinstance(mirror_error, tuf.BadVersionNumberError)) else: self.fail('TUF did not prevent a mix-and-match attack.') diff --git a/tests/test_replay_attack.py b/tests/test_replay_attack.py index 6251c8c5..f94d04d2 100755 --- a/tests/test_replay_attack.py +++ b/tests/test_replay_attack.py @@ -289,7 +289,7 @@ def test_with_tuf(self): # version. repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 12, 12) repository.write() - + # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), @@ -315,6 +315,7 @@ def test_with_tuf(self): # Restore the previous version of 'timestamp.json' on the remote repository # and verify that the non-TUF client downloads it (expected, but not ideal). shutil.move(backup_timestamp, timestamp_path) + logger.info('Moving the backup timestamp to the current version.') # Verify that the TUF client detects replayed metadata and refuses to # continue the update process. diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index d4a35ed9..fd9ef9fe 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -620,27 +620,20 @@ def test_generate_timestamp_metadata(self): version = 1 expiration_date = '1985-10-21T13:20:00Z' - compression_algorithms = ['gz'] - snapshot_metadata = \ repo_lib.generate_timestamp_metadata(snapshot_filename, version, - expiration_date, - compression_algorithms) + expiration_date) self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(snapshot_metadata)) # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata, - 3, version, expiration_date, compression_algorithms) + 3, version, expiration_date) self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata, - snapshot_filename, '3', expiration_date, - compression_algorithms) + snapshot_filename, '3', expiration_date) self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata, - snapshot_filename, version, '3', compression_algorithms) - self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata, - snapshot_filename, version, expiration_date, 3) - self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata, - snapshot_filename, version, expiration_date, ['compress']) + snapshot_filename, version, '3') + diff --git a/tests/test_roledb.py b/tests/test_roledb.py index 74dd7e09..5c3caeb4 100755 --- a/tests/test_roledb.py +++ b/tests/test_roledb.py @@ -329,12 +329,14 @@ def test_create_roledb_from_root_metadata(self): 'targets': {'keyids': [keyid2], 'threshold': 1}} version = 8 consistent_snapshot = False - expires = '1985-10-21T01:21:00Z' + expires = '1985-10-21T01:21:00Z' + compression_algorithms = ['gz'] root_metadata = tuf.formats.RootFile.make_metadata(version, expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertEqual(None, tuf.roledb.create_roledb_from_root_metadata(root_metadata)) # Ensure 'Root' and 'Targets' were added to the role database. @@ -372,7 +374,8 @@ def test_create_roledb_from_root_metadata(self): root_metadata = tuf.formats.RootFile.make_metadata(version, expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertRaises(tuf.Error, tuf.roledb.create_roledb_from_root_metadata, root_metadata) # Remove the invalid role and re-generate 'root_metadata' to test for the @@ -381,7 +384,8 @@ def test_create_roledb_from_root_metadata(self): root_metadata = tuf.formats.RootFile.make_metadata(version, expires, keydict, roledict, - consistent_snapshot) + consistent_snapshot, + compression_algorithms) self.assertEqual(None, tuf.roledb.create_roledb_from_root_metadata(root_metadata)) diff --git a/tests/test_updater.py b/tests/test_updater.py index 692e41e4..191a6da7 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -547,7 +547,10 @@ def test_3__update_metadata(self): # This is the default metadata that we would create for the timestamp role, # because it has no signed metadata for itself. DEFAULT_TIMESTAMP_FILELENGTH = tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH - + + # This is the the upper bound length for Targets metadata. + DEFAULT_TARGETS_FILELENGTH = tuf.conf.DEFAULT_TARGETS_REQUIRED_LENGTH + # Save the versioninfo of 'targets.json,' needed later when re-installing # with _update_metadata(). targets_versioninfo = \ @@ -557,7 +560,9 @@ def test_3__update_metadata(self): # Remove the currently installed metadata from the store and disk. Verify # that the metadata dictionary is re-populated after calling # _update_metadata(). - self.repository_updater.metadata['current'].clear() + del self.repository_updater.metadata['current']['timestamp'] + del self.repository_updater.metadata['current']['targets'] + timestamp_filepath = \ os.path.join(self.client_metadata_current, 'timestamp.json') targets_filepath = os.path.join(self.client_metadata_current, 'targets.json') @@ -575,7 +580,9 @@ def test_3__update_metadata(self): # Verify 'targets.json' is properly installed. self.assertFalse('targets' in self.repository_updater.metadata['current']) - self.repository_updater._update_metadata('targets', targets_versioninfo) + self.repository_updater._update_metadata('targets', + DEFAULT_TARGETS_FILELENGTH, + targets_versioninfo['version']) self.assertTrue('targets' in self.repository_updater.metadata['current']) targets_signable = tuf.util.load_json_file(targets_filepath) @@ -590,45 +597,47 @@ def test_3__update_metadata(self): # Verify 'targets.json.gz' is properly intalled. Note: The uncompressed # version is installed if the compressed one is downloaded. self.assertFalse('targets' in self.repository_updater.metadata['current']) - self.repository_updater._update_metadata('targets', targets_fileinfo, 'gzip', - targets_compressed_fileinfo) + self.repository_updater._update_metadata('targets', + DEFAULT_TARGETS_FILELENGTH, + targets_versioninfo['version'], + 'gzip') self.assertTrue('targets' in self.repository_updater.metadata['current']) - length, hashes = tuf.util.get_file_details(targets_filepath) - self.assertEqual(targets_fileinfo, tuf.formats.make_fileinfo(length, hashes)) + self.assertEqual(targets_versioninfo['version'], + self.repository_updater.metadata['current']['targets']['version']) - # Test: Invalid fileinfo. - # Invalid fileinfo for the uncompressed version of 'targets.json'. + # Test: Invalid version numbers. + # Invalid version number for the uncompressed version of 'targets.json'. self.assertRaises(tuf.NoWorkingMirrorError, self.repository_updater._update_metadata, - 'targets', targets_compressed_fileinfo) + 'targets', DEFAULT_TARGETS_FILELENGTH, 88) # Verify that the specific exception raised is correct for the previous # case. try: self.repository_updater._update_metadata('targets', - targets_compressed_fileinfo) + DEFAULT_TARGETS_FILELENGTH, 88) except tuf.NoWorkingMirrorError as e: for mirror_error in six.itervalues(e.mirror_errors): - assert isinstance(mirror_error, tuf.BadHashError) + assert isinstance(mirror_error, tuf.BadVersionNumberError) - # Invalid fileinfo for the compressed version of 'targets.json' + # Invalid version number for the compressed version of 'targets.json' self.assertRaises(tuf.NoWorkingMirrorError, self.repository_updater._update_metadata, - 'targets', targets_compressed_fileinfo, 'gzip', - targets_fileinfo) + 'targets', DEFAULT_TARGETS_FILELENGTH, 88, + 'gzip') # Verify that the specific exception raised is correct for the previous - # case. The length is checked before the hashes, so the specific error in + # case. The version number is checked before the hashes, so the specific error in # this case should be 'tuf.DownloadLengthMismatchError'. try: self.repository_updater._update_metadata('targets', - targets_compressed_fileinfo, - 'gzip', targets_fileinfo) + DEFAULT_TARGETS_FILELENGTH, + 88, 'gzip') except tuf.NoWorkingMirrorError as e: for mirror_error in six.itervalues(e.mirror_errors): - assert isinstance(mirror_error, tuf.DownloadLengthMismatchError) + assert isinstance(mirror_error, tuf.BadVersionNumberError) @@ -652,7 +661,6 @@ def test_3__update_metadata_if_changed(self): # Verify the current version of 'targets.json' has not changed. self.assertEqual(self.repository_updater.metadata['current']['targets']['version'], 1) - # Modify one target file on the remote repository. repository = repo_tool.load_repository(self.repository_directory) target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt') @@ -668,12 +676,12 @@ def test_3__update_metadata_if_changed(self): shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) - # Update 'targets.json' and verify that the client's current 'targets.json' # has been updated. 'timestamp' and 'snapshot' must be manually updated # so that new 'targets' may be recognized. DEFAULT_TIMESTAMP_FILELENGTH = tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH + logger.info('Attempting to increment targets to version 2...') self.repository_updater._update_metadata('timestamp', DEFAULT_TIMESTAMP_FILELENGTH) self.repository_updater._update_metadata_if_changed('snapshot', 'timestamp') self.repository_updater._update_metadata_if_changed('targets') @@ -1013,68 +1021,65 @@ def test_6_download_target(self): target_filepaths = \ list(self.repository_updater.metadata['current']['targets']['targets'].keys()) - # Test: normal case. # Get the target info, which is an argument to 'download_target()'. - for target_filepath in target_filepaths: - target_fileinfo = self.repository_updater.target(target_filepath) - self.repository_updater.download_target(target_fileinfo, - destination_directory) + + # 'target_filepaths' is expected to have at least two targets. The first + # target will be used to test against download_target(). The second + # will be used to test against download_target() and a repository with + # consistent snapshots. + target_filepath1 = target_filepaths.pop() + target_fileinfo = self.repository_updater.target(target_filepath1) + self.repository_updater.download_target(target_fileinfo, + destination_directory) - download_filepath = \ - os.path.join(destination_directory, target_filepath.lstrip('/')) - self.assertTrue(os.path.exists(download_filepath)) - length, hashes = tuf.util.get_file_details(download_filepath) - download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes) + download_filepath = \ + os.path.join(destination_directory, target_filepath1.lstrip('/')) + self.assertTrue(os.path.exists(download_filepath)) + length, hashes = tuf.util.get_file_details(download_filepath) + download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Add any 'custom' data from the repository's target fileinfo to the + # 'download_targetfileinfo' object being tested. + if 'custom' in target_fileinfo['fileinfo']: + download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom'] + self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo) + + # Test when consistent snapshots is set. First, create a valid + # repository with consistent snapshots set (root.json contains a + # "consistent_snapshot" entry that the updater uses to correctly fetch + # snapshots. The updater expects the existence of + # .filename files if root.json sets 'consistent_snapshot + # = True'. + + # The repository must be rewritten with consistent snapshots set. + repository = repo_tool.load_repository(self.repository_directory) + + repository.root.load_signing_key(self.role_keys['root']['private']) + repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) + repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) - # Add any 'custom' data from the repository's target fileinfo to the - # 'download_targetfileinfo' object being tested. - if 'custom' in target_fileinfo['fileinfo']: - download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom'] - self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo) - - # Test when consistent snapshots is set. First, create a valid - # repository with consistent snapshots set (root.json contains a - # "consistent_snapshots" entry that the updater uses to correctly fetch - # snapshots. The updater expects the existence of .filename files - # if root.json sets 'consistent_snapshot = True'. - - # The repository must be rewritten with consistent snapshots set. - repository = repo_tool.load_repository(self.repository_directory) - - repository.root.load_signing_key(self.role_keys['root']['private']) - repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) - repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) - repository.write(consistent_snapshot=True) - - # Move the staged metadata to the "live" metadata. - shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) - shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), - os.path.join(self.repository_directory, 'metadata')) - - self.repository_updater.refresh() - - # self.repository_updater.consistent_snapshot = True - - self.repository_updater.download_target(target_fileinfo, - destination_directory) + repository.write(consistent_snapshot=True) + + # Move the staged metadata to the "live" metadata. + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) + + self.repository_updater.refresh() + + target_filepath2 = target_filepaths.pop() + target_fileinfo2 = self.repository_updater.target(target_filepath2) + self.repository_updater.download_target(target_fileinfo2, + destination_directory) # Test: Invalid arguments. self.assertRaises(tuf.FormatError, self.repository_updater.download_target, 8, destination_directory) - random_target_filepath = target_filepaths.pop() - target_fileinfo = self.repository_updater.target(random_target_filepath) self.assertRaises(tuf.FormatError, self.repository_updater.download_target, target_fileinfo, 8) - # Non-existent destination. - # TODO: test for non-existent directories. - """ - self.assertRaises(tuf.Error, self.repository_updater.download_target, - target_fileinfo, 'non-existent/bad_path') - """ - # Test: # Attempt a file download of a valid target, however, a download exception # occurs because the target is not within the mirror's confined target @@ -1100,15 +1105,16 @@ def test_6_download_target(self): def test_7_updated_targets(self): - # Verify that list contains all files that need to be updated, these - # files include modified and new target files. Also, confirm that files - # than need not to be updated are absent from the list. + # Verify that the list of targets returned by updated_targets() contains + # all the files that need to be updated, these files include modified and + # new target files. Also, confirm that files than need not to be updated + # are absent from the list. # Setup # Create temporary directory which will hold client's target files. destination_directory = self.make_temp_directory() - # Get the list of target files. It will be used as an argument to - # 'updated_targets' function. + # Get the list of target files. It will be used as an argument to the + # 'updated_targets()' function. all_targets = self.repository_updater.all_targets() # Test for duplicates and targets in the root directory of the repository. @@ -1161,9 +1167,18 @@ def test_7_updated_targets(self): target1 = os.path.join(self.repository_directory, 'targets', 'file1.txt') repository.targets.remove_target(target1) + + length, hashes = tuf.util.get_file_details(target1) + + repository.targets.add_target(target1) + repository.targets.load_signing_key(self.role_keys['targets']['private']) + repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) + with open(target1, 'a') as file_object: file_object.write('append extra text') + length, hashes = tuf.util.get_file_details(target1) + repository.targets.add_target(target1) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) @@ -1175,7 +1190,8 @@ def test_7_updated_targets(self): shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) - # Ensure the client has the up-to-date metadata. + # Ensure the client has up-to-date metadata. + logger.info('refreshing top-level metadata after updating targets.json..') self.repository_updater.refresh() # Verify that the new target file is considered updated. diff --git a/tuf/client/updater.py b/tuf/client/updater.py index 31323710..3b08c0ed 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -921,10 +921,13 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, # Is 'metadata_signable' expired? self._ensure_not_expired(metadata_signable['signed'], metadata_role) + """ + # Is 'metadata_signable' newer than the currently installed # version? current_metadata_role = self.metadata['current'].get(metadata_role) + # Compare metadata version numbers. Ensure there is a current # version of the metadata role to be updated. if current_metadata_role is not None: @@ -933,6 +936,12 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, if downloaded_version < current_version: raise tuf.ReplayedMetadataError(metadata_role, downloaded_version, current_version) + else: + logger.info('current_version >= downloaded_version') + + else: + logger.info('current_metadata_role is None') + """ # Reject the metadata if any specified targets are not allowed. # 'tuf.ForbiddenTargetError' raised if any of the targets of 'metadata_role' @@ -1114,29 +1123,44 @@ def _get_metadata_file(self, metadata_role, remote_filename, # Verify 'file_object' according to the callable function. # 'file_object' is also verified if decompressed above (i.e., the # uncompressed version). - metadata_signable = \ tuf.util.load_json_string(file_object.read().decode('utf-8')) # If the version number is unspecified, ensure that the version number - # downloaded is greater than the currently trusted version number. + # downloaded is greater than the currently trusted version number for + # 'metadata_role'. version_downloaded = metadata_signable['signed']['version'] - if expected_version is None: - if version_downloaded <= expected_version: - message = \ - 'Downloaded version number: ' + repr(version_downloaded) + '.' \ - ' Version number MUST be greater than: ' + repr(expected_version) - raise tuf.BadVersionNumberError(message) - - # Otherwise, verify that the downloaded version matches the version - # requested. - else: + + if expected_version is not None: + # Verify that the downloaded version matches the version expected by + # the caller. if version_downloaded != expected_version: message = \ 'Downloaded version number: ' + repr(version_downloaded) + '.' \ ' Version number MUST be: ' + repr(expected_version) raise tuf.BadVersionNumberError(message) - + + # The caller does not know which version to download. Verify that the + # downloaded version is at least greater than the one locally available. + else: + # Verify that the version number of the locally stored + # 'timestamp.json', if available, is less than what was downloaded. + # Otherwise, accept the new timestamp with version number + # 'version_downloaded'. + logger.info('metadata_role: ' + repr(metadata_role)) + try: + current_version = \ + self.metadata['current'][metadata_role]['version'] + + if version_downloaded < current_version: + raise tuf.ReplayedMetadataError(metadata_role, version_downloaded, + current_version) + + except KeyError: + logger.info(metadata_role + ' not available locally.') + + self._verify_uncompressed_metadata_file(file_object, metadata_role) + except Exception as exception: # Remember the error from this mirror, and "reset" the target file. logger.exception('Update failed from ' + file_mirror + '.') @@ -1436,7 +1460,8 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None, filename_version = version dirname, basename = os.path.split(remote_filename) remote_filename = os.path.join(dirname, str(filename_version) + '.' + basename) - + + logger.info('Verifying ' + repr(metadata_role) + ' requesting version: ' + repr(version)) metadata_file_object = \ self._get_metadata_file(metadata_role, remote_filename, upperbound_filelength, version, compression) @@ -1703,7 +1728,7 @@ def _versioninfo_has_changed(self, metadata_filename, new_versioninfo): Boolean. True if the versioninfo has increased, false otherwise. """ - + # If there is no versioninfo currently stored for 'metadata_filename', # try to load the file, calculate the versioninfo, and store it. if metadata_filename not in self.versioninfo: @@ -1786,13 +1811,36 @@ def _update_versioninfo(self, metadata_filename): trusted_versioninfo = \ self.metadata['current']['timestamp']['version'] + # When updating snapshot.json, the client either (1) has a copy of + # snapshot.json, or (2) in the process of obtaining it by first downloading + # timestamp.json. Note: Clients may have only root.json and perform a + # refresh of top-level metadata to obtain the remaining roles. elif metadata_filename == 'snapshot.json': - trusted_versioninfo = \ - self.metadata['current']['timestamp']['meta']['snapshot.json'] + + # Verify the version number of the currently trusted snapshot.json in + # snapshot.json itself. Checking the version number specified in + # timestamp.json may be greater than the version specified in the + # client's copy of snapshot.json. + try: + timestamp_version_number = self.metadata['current']['snapshot']['version'] + trusted_versioninfo = tuf.formats.make_versioninfo(timestamp_version_number) + + except KeyError: + trusted_versioninfo = \ + self.metadata['current']['timestamp']['meta']['snapshot.json'] else: - trusted_versioninfo = \ - self.metadata['current']['snapshot']['meta'][metadata_filename] + + try: + # The metadata file names in 'self.metadata' exclude the role + # extension. Strip the '.json' extension when checking if + # 'metadata_filename' currently exists. + targets_version_number = self.metadata['current'][metadata_filename[:-len('.json')]]['version'] + trusted_versioninfo = tuf.formats.make_versioninfo(targets_version_number) + + except KeyError: + trusted_versioninfo = \ + self.metadata['current']['snapshot']['meta'][metadata_filenamed] self.versioninfo[metadata_filename] = trusted_versioninfo @@ -2841,20 +2889,15 @@ def download_target(self, target, destination_directory): target_filepath.lstrip(os.sep)) destination = os.path.abspath(destination) target_dirpath = os.path.dirname(destination) - if target_dirpath: - try: - os.makedirs(target_dirpath) - - except OSError as e: - if e.errno == errno.EEXIST: - pass - - else: - raise - else: - message = repr(target_dirpath) + ' does not exist.' - logger.warning(message) - raise tuf.Error(message) + try: + os.makedirs(target_dirpath) + + except OSError as e: + if e.errno == errno.EEXIST: + pass + + else: + raise target_file_object.move(destination) diff --git a/tuf/developer_tool.py b/tuf/developer_tool.py index 4557c412..13ea3c6f 100755 --- a/tuf/developer_tool.py +++ b/tuf/developer_tool.py @@ -526,7 +526,8 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, if tuf.sig.verify(signable, rolename) or write_partial: _remove_invalid_and_duplicate_signatures(signable) compressions = roleinfo['compressions'] - filename = write_metadata_file(signable, metadata_filename, compressions, + filename = write_metadata_file(signable, metadata_filename, + metadata['version'], compressions, False) # 'signable' contains an invalid threshold of signatures. diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 07eb80ea..2bf65e86 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -121,7 +121,6 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, # Retrieve the roleinfo of 'rolename' to extract the needed metadata # attributes, such as version number, expiration, etc. roleinfo = tuf.roledb.get_roleinfo(rolename) - snapshot_compressions = tuf.roledb.get_roleinfo('snapshot')['compressions'] # Generate the appropriate role metadata for 'rolename'. if rolename == 'root': @@ -140,6 +139,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, roleinfo['expires'], roleinfo['delegations'], consistent_snapshot) + if rolename == 'targets': _log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'], TARGETS_EXPIRES_WARN_SECONDS) @@ -152,6 +152,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, roleinfo['expires'], root_filename, targets_filename, consistent_snapshot) + _log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'], SNAPSHOT_EXPIRES_WARN_SECONDS) @@ -160,8 +161,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, snapshot_filename = filenames['snapshot'] metadata = generate_timestamp_metadata(snapshot_filename, roleinfo['version'], - roleinfo['expires'], - snapshot_compressions) + roleinfo['expires']) _log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'], TIMESTAMP_EXPIRES_WARN_SECONDS) @@ -205,9 +205,9 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, metadata['version'], compressions, consistent_snapshot) - # The root and timestamp files should also be written without a digest if - # 'consistent_snaptshots' is True. Client may request a timestamp and root - # file without knowing its digest and file size. + # The root and timestamp files should also be written without a version + # number prepended if 'consistent_snaptshots' is True. Client may request + # a timestamp and root file without knowing its version number. if rolename == 'root' or rolename == 'timestamp': write_metadata_file(signable, metadata_filename, metadata['version'], compressions, consistent_snapshot=False) @@ -218,7 +218,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, message = 'Not enough signatures for ' + repr(metadata_filename) raise tuf.UnsignedMetadataError(message, signable) - return signable, filename + return signable, filename @@ -578,7 +578,7 @@ def _load_top_level_metadata(repository, top_level_filenames): consistent_snapshot = root_metadata['consistent_snapshot'] else: - message = 'Cannot load the required root file: '+repr(root_filename) + message = 'Cannot load the required root file: ' + repr(root_filename) raise tuf.RepositoryError(message) # Load 'timestamp.json'. A Timestamp role file without a version number is @@ -1542,7 +1542,7 @@ def generate_targets_metadata(targets_directory, target_files, version, # Note: join() discards 'targets_directory' if 'target' contains a leading # path separator (i.e., is treated as an absolute path). target_path = os.path.join(targets_directory, target.lstrip(os.sep)) - + # Ensure all target files listed in 'target_files' exist. If just one of # these files does not exist, raise an exception. if not os.path.exists(target_path): @@ -1711,7 +1711,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, def generate_timestamp_metadata(snapshot_filename, version, - expiration_date, compressions=()): + expiration_date): """ Generate the timestamp metadata object. The 'snapshot.json' file must @@ -1755,7 +1755,6 @@ def generate_timestamp_metadata(snapshot_filename, version, tuf.formats.PATH_SCHEMA.check_match(snapshot_filename) tuf.formats.METADATAVERSION_SCHEMA.check_match(version) tuf.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date) - tuf.formats.COMPRESSIONS_SCHEMA.check_match(compressions) # Retrieve the versioninfo of the Snapshot metadata file. versioninfo = {} @@ -1764,8 +1763,8 @@ def generate_timestamp_metadata(snapshot_filename, version, # Save the versioninfo of the compressed versions of 'timestamp.json' # in 'versioninfo'. Log the files included in 'fileinfo'. # TODO: Since version numbers are now stored, the version numbers of - # compressed roles do not change and can thus be excluded. Remove this - # after testing. + # compressed roles do not change and can thus be excluded. Remove the + # following commented lines after testing. """ for file_extension in compressions: if not len(file_extension): diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 082c68ae..11903a78 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -197,10 +197,10 @@ def write(self, write_partial=False, consistent_snapshot=False, consistent_snapshot: A boolean indicating whether written metadata and target files should - include a digest in the filename (i.e., .root.json, - .targets.json.gz, .README.json, where is the - file's SHA256 digest. Example: - 1f4e35a60c8f96d439e27e858ce2869c770c1cdd54e1ef76657ceaaf01da18a3.root.json' + include a version number in the filename (i.e., + .root.json, .targets.json.gz, + .README.json, where is the file's + SHA256 digest. Example: 13.root.json' compressions: A list of compression algorithms. Each of these algorithms will be @@ -271,7 +271,7 @@ def write(self, write_partial=False, consistent_snapshot=False, repo_lib._generate_and_write_metadata('root', root_filename, write_partial, self._targets_directory, self._metadata_directory, - consistent_snapshot, compressions) + consistent_snapshot) # Generate the 'targets.json' metadata file. targets_filename = repo_lib.TARGETS_FILENAME @@ -2704,9 +2704,9 @@ def load_repository(repository_directory): filenames = repo_lib.get_metadata_filenames(metadata_directory) - # The Root file is always available without a consistent snapshots digest - # attached to the filename. Store the 'consistent_snapshot' value read the - # loaded Root file so that other metadata files may be located. + # The Root file is always available without a consistent snapshots version + # number attached to the filename. Store the 'consistent_snapshot' value + # read the loaded Root file so that other metadata files may be located. # 'consistent_snapshot' value. consistent_snapshot = False @@ -2747,10 +2747,10 @@ def load_repository(repository_directory): else: continue - + # Keep a store metadata previously loaded metadata to prevent # re-loading duplicate versions. Duplicate versions may occur with - # consistent_snapshot, where the same metadata may be available in + # 'consistent_snapshot', where the same metadata may be available in # multiples files (the different hash is included in each filename. if metadata_name in loaded_metadata: continue @@ -2766,6 +2766,8 @@ def load_repository(repository_directory): # Extract the metadata attributes 'metadata_name' and update its # corresponding roleinfo. + # TODO: Test for detection of the Targets role. + roleinfo = tuf.roledb.get_roleinfo(metadata_name) roleinfo['signatures'].extend(signable['signatures']) roleinfo['version'] = metadata_object['version']