Fix remaining issues with unit tests after implementing version numbers in snapshot.json

This commit is contained in:
Vladimir Diaz 2015-10-27 16:11:11 -04:00
parent 987411ff04
commit 4cb851ca0a
13 changed files with 253 additions and 188 deletions

View file

@ -282,7 +282,7 @@ def test_with_tuf(self):
except tuf.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
self.assertTrue(isinstance(mirror_error, tuf.InvalidMetadataJSONError))
self.assertTrue(isinstance(mirror_error, tuf.Error))
else:
self.fail('TUF did not prevent an endless data attack.')

View file

@ -223,7 +223,7 @@ def test_with_tuf(self):
# Verify that 'role1.json' is the culprit.
self.assertEqual(url_file, mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.BadHashError))
self.assertTrue(isinstance(mirror_error, tuf.ForbiddenTargetError))
else:
self.fail('TUF did not prevent an extraneous dependencies attack.')

View file

@ -197,6 +197,7 @@ def test_schemas(self):
{'_type': 'Root',
'version': 8,
'consistent_snapshot': False,
'compression_algorithms': ['gz'],
'expires': '1985-10-21T13:20:00Z',
'keys': {'123abc': {'keytype': 'rsa',
'keyval': {'public': 'pubkey',
@ -223,17 +224,13 @@ def test_schemas(self):
{'_type': 'Snapshot',
'version': 8,
'expires': '1985-10-21T13:20:00Z',
'meta': {'metadata/snapshot.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}}),
'meta': {'metadata/snapshot.json': {'version': 1024}}}),
'TIMESTAMP_SCHEMA': (tuf.formats.TIMESTAMP_SCHEMA,
{'_type': 'Timestamp',
'version': 8,
'expires': '1985-10-21T13:20:00Z',
'meta': {'metadata/timestamp.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}}),
'meta': {'metadata/timestamp.json': {'version': 1024}}}),
'MIRROR_SCHEMA': (tuf.formats.MIRROR_SCHEMA,
{'url_prefix': 'http://localhost:8001',
@ -303,29 +300,27 @@ def test_TimestampFile(self):
# Test conditions for valid instances of 'tuf.formats.TimestampFile'.
version = 8
expires = '1985-10-21T13:20:00Z'
filedict = {'metadata/timestamp.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}
versiondict = {'targets.json': {'version': version}}
make_metadata = tuf.formats.TimestampFile.make_metadata
from_metadata = tuf.formats.TimestampFile.from_metadata
TIMESTAMP_SCHEMA = tuf.formats.TIMESTAMP_SCHEMA
self.assertTrue(TIMESTAMP_SCHEMA.matches(make_metadata(version, expires,
filedict)))
metadata = make_metadata(version, expires, filedict)
versiondict)))
metadata = make_metadata(version, expires, versiondict)
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TimestampFile))
# Test conditions for invalid arguments.
bad_version = 'eight'
bad_expires = '2000'
bad_filedict = 123
bad_versiondict = 123
self.assertRaises(tuf.FormatError, make_metadata, bad_version,
expires, filedict)
expires, versiondict)
self.assertRaises(tuf.FormatError, make_metadata, version,
bad_expires, filedict)
bad_expires, versiondict)
self.assertRaises(tuf.FormatError, make_metadata, version,
expires, bad_filedict)
expires, bad_versiondict)
self.assertRaises(tuf.FormatError, from_metadata, 123)
@ -345,6 +340,8 @@ def test_RootFile(self):
roledict = {'root': {'keyids': ['123abc'],
'threshold': 1,
'paths': ['path1/', 'path2']}}
compression_algorithms = ['gz']
make_metadata = tuf.formats.RootFile.make_metadata
from_metadata = tuf.formats.RootFile.from_metadata
@ -352,9 +349,10 @@ def test_RootFile(self):
self.assertTrue(ROOT_SCHEMA.matches(make_metadata(version, expires,
keydict, roledict,
consistent_snapshot)))
consistent_snapshot,
compression_algorithms)))
metadata = make_metadata(version, expires, keydict, roledict,
consistent_snapshot)
consistent_snapshot, compression_algorithms)
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.RootFile))
# Test conditions for invalid arguments.
@ -362,23 +360,28 @@ def test_RootFile(self):
bad_expires = 'eight'
bad_keydict = 123
bad_roledict = 123
bad_compression_algorithms = 'nozip'
self.assertRaises(tuf.FormatError, make_metadata, bad_version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, make_metadata, version,
bad_expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, make_metadata, version,
expires,
bad_keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, make_metadata, version,
expires,
keydict, bad_roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, from_metadata, 'bad')
@ -388,30 +391,27 @@ def test_SnapshotFile(self):
# Test conditions for valid instances of 'tuf.formats.SnapshotFile'.
version = 8
expires = '1985-10-21T13:20:00Z'
filedict = {'metadata/snapshot.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}
versiondict = {'targets.json' : {'version': version}}
make_metadata = tuf.formats.SnapshotFile.make_metadata
from_metadata = tuf.formats.SnapshotFile.from_metadata
SNAPSHOT_SCHEMA = tuf.formats.SNAPSHOT_SCHEMA
self.assertTrue(SNAPSHOT_SCHEMA.matches(make_metadata(version, expires,
filedict)))
metadata = make_metadata(version, expires, filedict)
versiondict)))
metadata = make_metadata(version, expires, versiondict)
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.SnapshotFile))
# Test conditions for invalid arguments.
bad_version = '8'
bad_expires = '2000'
bad_filedict = 123
bad_versiondict = 123
self.assertRaises(tuf.FormatError, make_metadata, version,
expires, bad_filedict)
expires, bad_versiondict)
self.assertRaises(tuf.FormatError, make_metadata, bad_version, expires,
filedict)
versiondict)
self.assertRaises(tuf.FormatError, make_metadata, version, bad_expires,
bad_filedict)
bad_versiondict)
self.assertRaises(tuf.FormatError, from_metadata, 123)
@ -548,6 +548,7 @@ def test_make_signable(self):
root = {'_type': 'Root',
'version': 8,
'consistent_snapshot': False,
'compression_algorithms': ['gz'],
'expires': '1985-10-21T13:20:00Z',
'keys': {'123abc': {'keytype': 'rsa',
'keyval': {'public': 'pubkey',
@ -679,6 +680,7 @@ def test_check_signable_object_format(self):
root = {'_type': 'Root',
'version': 8,
'consistent_snapshot': False,
'compression_algorithms': ['gz'],
'expires': '1985-10-21T13:20:00Z',
'keys': {'123abc': {'keytype': 'rsa',
'keyval': {'public': 'pubkey',

View file

@ -190,11 +190,13 @@ def test_create_keydb_from_root_metadata(self):
version = 8
consistent_snapshot = False
expires = '1985-10-21T01:21:00Z'
compression_algorithms = ['gz']
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata))
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
@ -230,11 +232,13 @@ def test_create_keydb_from_root_metadata(self):
keydict[keyid3] = rsakey3
version = 8
expires = '1985-10-21T01:21:00Z'
compression_algorithms = ['gz']
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata))
# Ensure only 'keyid2' was added to the keydb database. 'keyid' and

View file

@ -248,7 +248,7 @@ def test_with_tuf(self):
# Verify that 'timestamp.json' is the culprit.
self.assertEqual(url_file, mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.BadHashError))
self.assertTrue(isinstance(mirror_error, tuf.BadVersionNumberError))
else:
self.fail('TUF did not prevent a mix-and-match attack.')

View file

@ -289,7 +289,7 @@ def test_with_tuf(self):
# version.
repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 12, 12)
repository.write()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
@ -315,6 +315,7 @@ def test_with_tuf(self):
# Restore the previous version of 'timestamp.json' on the remote repository
# and verify that the non-TUF client downloads it (expected, but not ideal).
shutil.move(backup_timestamp, timestamp_path)
logger.info('Moving the backup timestamp to the current version.')
# Verify that the TUF client detects replayed metadata and refuses to
# continue the update process.

View file

@ -620,27 +620,20 @@ def test_generate_timestamp_metadata(self):
version = 1
expiration_date = '1985-10-21T13:20:00Z'
compression_algorithms = ['gz']
snapshot_metadata = \
repo_lib.generate_timestamp_metadata(snapshot_filename, version,
expiration_date,
compression_algorithms)
expiration_date)
self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(snapshot_metadata))
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
3, version, expiration_date, compression_algorithms)
3, version, expiration_date)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, '3', expiration_date,
compression_algorithms)
snapshot_filename, '3', expiration_date)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, version, '3', compression_algorithms)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, version, expiration_date, 3)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, version, expiration_date, ['compress'])
snapshot_filename, version, '3')

View file

@ -329,12 +329,14 @@ def test_create_roledb_from_root_metadata(self):
'targets': {'keyids': [keyid2], 'threshold': 1}}
version = 8
consistent_snapshot = False
expires = '1985-10-21T01:21:00Z'
expires = '1985-10-21T01:21:00Z'
compression_algorithms = ['gz']
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None,
tuf.roledb.create_roledb_from_root_metadata(root_metadata))
# Ensure 'Root' and 'Targets' were added to the role database.
@ -372,7 +374,8 @@ def test_create_roledb_from_root_metadata(self):
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.Error,
tuf.roledb.create_roledb_from_root_metadata, root_metadata)
# Remove the invalid role and re-generate 'root_metadata' to test for the
@ -381,7 +384,8 @@ def test_create_roledb_from_root_metadata(self):
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None,
tuf.roledb.create_roledb_from_root_metadata(root_metadata))

View file

@ -547,7 +547,10 @@ def test_3__update_metadata(self):
# This is the default metadata that we would create for the timestamp role,
# because it has no signed metadata for itself.
DEFAULT_TIMESTAMP_FILELENGTH = tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
# This is the the upper bound length for Targets metadata.
DEFAULT_TARGETS_FILELENGTH = tuf.conf.DEFAULT_TARGETS_REQUIRED_LENGTH
# Save the versioninfo of 'targets.json,' needed later when re-installing
# with _update_metadata().
targets_versioninfo = \
@ -557,7 +560,9 @@ def test_3__update_metadata(self):
# Remove the currently installed metadata from the store and disk. Verify
# that the metadata dictionary is re-populated after calling
# _update_metadata().
self.repository_updater.metadata['current'].clear()
del self.repository_updater.metadata['current']['timestamp']
del self.repository_updater.metadata['current']['targets']
timestamp_filepath = \
os.path.join(self.client_metadata_current, 'timestamp.json')
targets_filepath = os.path.join(self.client_metadata_current, 'targets.json')
@ -575,7 +580,9 @@ def test_3__update_metadata(self):
# Verify 'targets.json' is properly installed.
self.assertFalse('targets' in self.repository_updater.metadata['current'])
self.repository_updater._update_metadata('targets', targets_versioninfo)
self.repository_updater._update_metadata('targets',
DEFAULT_TARGETS_FILELENGTH,
targets_versioninfo['version'])
self.assertTrue('targets' in self.repository_updater.metadata['current'])
targets_signable = tuf.util.load_json_file(targets_filepath)
@ -590,45 +597,47 @@ def test_3__update_metadata(self):
# Verify 'targets.json.gz' is properly intalled. Note: The uncompressed
# version is installed if the compressed one is downloaded.
self.assertFalse('targets' in self.repository_updater.metadata['current'])
self.repository_updater._update_metadata('targets', targets_fileinfo, 'gzip',
targets_compressed_fileinfo)
self.repository_updater._update_metadata('targets',
DEFAULT_TARGETS_FILELENGTH,
targets_versioninfo['version'],
'gzip')
self.assertTrue('targets' in self.repository_updater.metadata['current'])
length, hashes = tuf.util.get_file_details(targets_filepath)
self.assertEqual(targets_fileinfo, tuf.formats.make_fileinfo(length, hashes))
self.assertEqual(targets_versioninfo['version'],
self.repository_updater.metadata['current']['targets']['version'])
# Test: Invalid fileinfo.
# Invalid fileinfo for the uncompressed version of 'targets.json'.
# Test: Invalid version numbers.
# Invalid version number for the uncompressed version of 'targets.json'.
self.assertRaises(tuf.NoWorkingMirrorError,
self.repository_updater._update_metadata,
'targets', targets_compressed_fileinfo)
'targets', DEFAULT_TARGETS_FILELENGTH, 88)
# Verify that the specific exception raised is correct for the previous
# case.
try:
self.repository_updater._update_metadata('targets',
targets_compressed_fileinfo)
DEFAULT_TARGETS_FILELENGTH, 88)
except tuf.NoWorkingMirrorError as e:
for mirror_error in six.itervalues(e.mirror_errors):
assert isinstance(mirror_error, tuf.BadHashError)
assert isinstance(mirror_error, tuf.BadVersionNumberError)
# Invalid fileinfo for the compressed version of 'targets.json'
# Invalid version number for the compressed version of 'targets.json'
self.assertRaises(tuf.NoWorkingMirrorError,
self.repository_updater._update_metadata,
'targets', targets_compressed_fileinfo, 'gzip',
targets_fileinfo)
'targets', DEFAULT_TARGETS_FILELENGTH, 88,
'gzip')
# Verify that the specific exception raised is correct for the previous
# case. The length is checked before the hashes, so the specific error in
# case. The version number is checked before the hashes, so the specific error in
# this case should be 'tuf.DownloadLengthMismatchError'.
try:
self.repository_updater._update_metadata('targets',
targets_compressed_fileinfo,
'gzip', targets_fileinfo)
DEFAULT_TARGETS_FILELENGTH,
88, 'gzip')
except tuf.NoWorkingMirrorError as e:
for mirror_error in six.itervalues(e.mirror_errors):
assert isinstance(mirror_error, tuf.DownloadLengthMismatchError)
assert isinstance(mirror_error, tuf.BadVersionNumberError)
@ -652,7 +661,6 @@ def test_3__update_metadata_if_changed(self):
# Verify the current version of 'targets.json' has not changed.
self.assertEqual(self.repository_updater.metadata['current']['targets']['version'], 1)
# Modify one target file on the remote repository.
repository = repo_tool.load_repository(self.repository_directory)
target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt')
@ -668,12 +676,12 @@ def test_3__update_metadata_if_changed(self):
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Update 'targets.json' and verify that the client's current 'targets.json'
# has been updated. 'timestamp' and 'snapshot' must be manually updated
# so that new 'targets' may be recognized.
DEFAULT_TIMESTAMP_FILELENGTH = tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
logger.info('Attempting to increment targets to version 2...')
self.repository_updater._update_metadata('timestamp', DEFAULT_TIMESTAMP_FILELENGTH)
self.repository_updater._update_metadata_if_changed('snapshot', 'timestamp')
self.repository_updater._update_metadata_if_changed('targets')
@ -1013,68 +1021,65 @@ def test_6_download_target(self):
target_filepaths = \
list(self.repository_updater.metadata['current']['targets']['targets'].keys())
# Test: normal case.
# Get the target info, which is an argument to 'download_target()'.
for target_filepath in target_filepaths:
target_fileinfo = self.repository_updater.target(target_filepath)
self.repository_updater.download_target(target_fileinfo,
destination_directory)
# 'target_filepaths' is expected to have at least two targets. The first
# target will be used to test against download_target(). The second
# will be used to test against download_target() and a repository with
# consistent snapshots.
target_filepath1 = target_filepaths.pop()
target_fileinfo = self.repository_updater.target(target_filepath1)
self.repository_updater.download_target(target_fileinfo,
destination_directory)
download_filepath = \
os.path.join(destination_directory, target_filepath.lstrip('/'))
self.assertTrue(os.path.exists(download_filepath))
length, hashes = tuf.util.get_file_details(download_filepath)
download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes)
download_filepath = \
os.path.join(destination_directory, target_filepath1.lstrip('/'))
self.assertTrue(os.path.exists(download_filepath))
length, hashes = tuf.util.get_file_details(download_filepath)
download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes)
# Add any 'custom' data from the repository's target fileinfo to the
# 'download_targetfileinfo' object being tested.
if 'custom' in target_fileinfo['fileinfo']:
download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom']
self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo)
# Test when consistent snapshots is set. First, create a valid
# repository with consistent snapshots set (root.json contains a
# "consistent_snapshot" entry that the updater uses to correctly fetch
# snapshots. The updater expects the existence of
# <version_number>.filename files if root.json sets 'consistent_snapshot
# = True'.
# The repository must be rewritten with consistent snapshots set.
repository = repo_tool.load_repository(self.repository_directory)
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# Add any 'custom' data from the repository's target fileinfo to the
# 'download_targetfileinfo' object being tested.
if 'custom' in target_fileinfo['fileinfo']:
download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom']
self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo)
# Test when consistent snapshots is set. First, create a valid
# repository with consistent snapshots set (root.json contains a
# "consistent_snapshots" entry that the updater uses to correctly fetch
# snapshots. The updater expects the existence of <hash>.filename files
# if root.json sets 'consistent_snapshot = True'.
# The repository must be rewritten with consistent snapshots set.
repository = repo_tool.load_repository(self.repository_directory)
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.write(consistent_snapshot=True)
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
self.repository_updater.refresh()
# self.repository_updater.consistent_snapshot = True
self.repository_updater.download_target(target_fileinfo,
destination_directory)
repository.write(consistent_snapshot=True)
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
self.repository_updater.refresh()
target_filepath2 = target_filepaths.pop()
target_fileinfo2 = self.repository_updater.target(target_filepath2)
self.repository_updater.download_target(target_fileinfo2,
destination_directory)
# Test: Invalid arguments.
self.assertRaises(tuf.FormatError, self.repository_updater.download_target,
8, destination_directory)
random_target_filepath = target_filepaths.pop()
target_fileinfo = self.repository_updater.target(random_target_filepath)
self.assertRaises(tuf.FormatError, self.repository_updater.download_target,
target_fileinfo, 8)
# Non-existent destination.
# TODO: test for non-existent directories.
"""
self.assertRaises(tuf.Error, self.repository_updater.download_target,
target_fileinfo, 'non-existent/bad_path')
"""
# Test:
# Attempt a file download of a valid target, however, a download exception
# occurs because the target is not within the mirror's confined target
@ -1100,15 +1105,16 @@ def test_6_download_target(self):
def test_7_updated_targets(self):
# Verify that list contains all files that need to be updated, these
# files include modified and new target files. Also, confirm that files
# than need not to be updated are absent from the list.
# Verify that the list of targets returned by updated_targets() contains
# all the files that need to be updated, these files include modified and
# new target files. Also, confirm that files than need not to be updated
# are absent from the list.
# Setup
# Create temporary directory which will hold client's target files.
destination_directory = self.make_temp_directory()
# Get the list of target files. It will be used as an argument to
# 'updated_targets' function.
# Get the list of target files. It will be used as an argument to the
# 'updated_targets()' function.
all_targets = self.repository_updater.all_targets()
# Test for duplicates and targets in the root directory of the repository.
@ -1161,9 +1167,18 @@ def test_7_updated_targets(self):
target1 = os.path.join(self.repository_directory, 'targets', 'file1.txt')
repository.targets.remove_target(target1)
length, hashes = tuf.util.get_file_details(target1)
repository.targets.add_target(target1)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
with open(target1, 'a') as file_object:
file_object.write('append extra text')
length, hashes = tuf.util.get_file_details(target1)
repository.targets.add_target(target1)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
@ -1175,7 +1190,8 @@ def test_7_updated_targets(self):
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Ensure the client has the up-to-date metadata.
# Ensure the client has up-to-date metadata.
logger.info('refreshing top-level metadata after updating targets.json..')
self.repository_updater.refresh()
# Verify that the new target file is considered updated.

View file

@ -921,10 +921,13 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
# Is 'metadata_signable' expired?
self._ensure_not_expired(metadata_signable['signed'], metadata_role)
"""
# Is 'metadata_signable' newer than the currently installed
# version?
current_metadata_role = self.metadata['current'].get(metadata_role)
# Compare metadata version numbers. Ensure there is a current
# version of the metadata role to be updated.
if current_metadata_role is not None:
@ -933,6 +936,12 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
if downloaded_version < current_version:
raise tuf.ReplayedMetadataError(metadata_role, downloaded_version,
current_version)
else:
logger.info('current_version >= downloaded_version')
else:
logger.info('current_metadata_role is None')
"""
# Reject the metadata if any specified targets are not allowed.
# 'tuf.ForbiddenTargetError' raised if any of the targets of 'metadata_role'
@ -1114,29 +1123,44 @@ def _get_metadata_file(self, metadata_role, remote_filename,
# Verify 'file_object' according to the callable function.
# 'file_object' is also verified if decompressed above (i.e., the
# uncompressed version).
metadata_signable = \
tuf.util.load_json_string(file_object.read().decode('utf-8'))
# If the version number is unspecified, ensure that the version number
# downloaded is greater than the currently trusted version number.
# downloaded is greater than the currently trusted version number for
# 'metadata_role'.
version_downloaded = metadata_signable['signed']['version']
if expected_version is None:
if version_downloaded <= expected_version:
message = \
'Downloaded version number: ' + repr(version_downloaded) + '.' \
' Version number MUST be greater than: ' + repr(expected_version)
raise tuf.BadVersionNumberError(message)
# Otherwise, verify that the downloaded version matches the version
# requested.
else:
if expected_version is not None:
# Verify that the downloaded version matches the version expected by
# the caller.
if version_downloaded != expected_version:
message = \
'Downloaded version number: ' + repr(version_downloaded) + '.' \
' Version number MUST be: ' + repr(expected_version)
raise tuf.BadVersionNumberError(message)
# The caller does not know which version to download. Verify that the
# downloaded version is at least greater than the one locally available.
else:
# Verify that the version number of the locally stored
# 'timestamp.json', if available, is less than what was downloaded.
# Otherwise, accept the new timestamp with version number
# 'version_downloaded'.
logger.info('metadata_role: ' + repr(metadata_role))
try:
current_version = \
self.metadata['current'][metadata_role]['version']
if version_downloaded < current_version:
raise tuf.ReplayedMetadataError(metadata_role, version_downloaded,
current_version)
except KeyError:
logger.info(metadata_role + ' not available locally.')
self._verify_uncompressed_metadata_file(file_object, metadata_role)
except Exception as exception:
# Remember the error from this mirror, and "reset" the target file.
logger.exception('Update failed from ' + file_mirror + '.')
@ -1436,7 +1460,8 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
filename_version = version
dirname, basename = os.path.split(remote_filename)
remote_filename = os.path.join(dirname, str(filename_version) + '.' + basename)
logger.info('Verifying ' + repr(metadata_role) + ' requesting version: ' + repr(version))
metadata_file_object = \
self._get_metadata_file(metadata_role, remote_filename,
upperbound_filelength, version, compression)
@ -1703,7 +1728,7 @@ def _versioninfo_has_changed(self, metadata_filename, new_versioninfo):
<Returns>
Boolean. True if the versioninfo has increased, false otherwise.
"""
# If there is no versioninfo currently stored for 'metadata_filename',
# try to load the file, calculate the versioninfo, and store it.
if metadata_filename not in self.versioninfo:
@ -1786,13 +1811,36 @@ def _update_versioninfo(self, metadata_filename):
trusted_versioninfo = \
self.metadata['current']['timestamp']['version']
# When updating snapshot.json, the client either (1) has a copy of
# snapshot.json, or (2) in the process of obtaining it by first downloading
# timestamp.json. Note: Clients may have only root.json and perform a
# refresh of top-level metadata to obtain the remaining roles.
elif metadata_filename == 'snapshot.json':
trusted_versioninfo = \
self.metadata['current']['timestamp']['meta']['snapshot.json']
# Verify the version number of the currently trusted snapshot.json in
# snapshot.json itself. Checking the version number specified in
# timestamp.json may be greater than the version specified in the
# client's copy of snapshot.json.
try:
timestamp_version_number = self.metadata['current']['snapshot']['version']
trusted_versioninfo = tuf.formats.make_versioninfo(timestamp_version_number)
except KeyError:
trusted_versioninfo = \
self.metadata['current']['timestamp']['meta']['snapshot.json']
else:
trusted_versioninfo = \
self.metadata['current']['snapshot']['meta'][metadata_filename]
try:
# The metadata file names in 'self.metadata' exclude the role
# extension. Strip the '.json' extension when checking if
# 'metadata_filename' currently exists.
targets_version_number = self.metadata['current'][metadata_filename[:-len('.json')]]['version']
trusted_versioninfo = tuf.formats.make_versioninfo(targets_version_number)
except KeyError:
trusted_versioninfo = \
self.metadata['current']['snapshot']['meta'][metadata_filenamed]
self.versioninfo[metadata_filename] = trusted_versioninfo
@ -2841,20 +2889,15 @@ def download_target(self, target, destination_directory):
target_filepath.lstrip(os.sep))
destination = os.path.abspath(destination)
target_dirpath = os.path.dirname(destination)
if target_dirpath:
try:
os.makedirs(target_dirpath)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
else:
message = repr(target_dirpath) + ' does not exist.'
logger.warning(message)
raise tuf.Error(message)
try:
os.makedirs(target_dirpath)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
target_file_object.move(destination)

View file

@ -526,7 +526,8 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
if tuf.sig.verify(signable, rolename) or write_partial:
_remove_invalid_and_duplicate_signatures(signable)
compressions = roleinfo['compressions']
filename = write_metadata_file(signable, metadata_filename, compressions,
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], compressions,
False)
# 'signable' contains an invalid threshold of signatures.

View file

@ -121,7 +121,6 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
# Retrieve the roleinfo of 'rolename' to extract the needed metadata
# attributes, such as version number, expiration, etc.
roleinfo = tuf.roledb.get_roleinfo(rolename)
snapshot_compressions = tuf.roledb.get_roleinfo('snapshot')['compressions']
# Generate the appropriate role metadata for 'rolename'.
if rolename == 'root':
@ -140,6 +139,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
roleinfo['expires'],
roleinfo['delegations'],
consistent_snapshot)
if rolename == 'targets':
_log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'],
TARGETS_EXPIRES_WARN_SECONDS)
@ -152,6 +152,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
roleinfo['expires'], root_filename,
targets_filename,
consistent_snapshot)
_log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'],
SNAPSHOT_EXPIRES_WARN_SECONDS)
@ -160,8 +161,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
snapshot_filename = filenames['snapshot']
metadata = generate_timestamp_metadata(snapshot_filename,
roleinfo['version'],
roleinfo['expires'],
snapshot_compressions)
roleinfo['expires'])
_log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'],
TIMESTAMP_EXPIRES_WARN_SECONDS)
@ -205,9 +205,9 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
metadata['version'], compressions,
consistent_snapshot)
# The root and timestamp files should also be written without a digest if
# 'consistent_snaptshots' is True. Client may request a timestamp and root
# file without knowing its digest and file size.
# The root and timestamp files should also be written without a version
# number prepended if 'consistent_snaptshots' is True. Client may request
# a timestamp and root file without knowing its version number.
if rolename == 'root' or rolename == 'timestamp':
write_metadata_file(signable, metadata_filename, metadata['version'],
compressions, consistent_snapshot=False)
@ -218,7 +218,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
message = 'Not enough signatures for ' + repr(metadata_filename)
raise tuf.UnsignedMetadataError(message, signable)
return signable, filename
return signable, filename
@ -578,7 +578,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
consistent_snapshot = root_metadata['consistent_snapshot']
else:
message = 'Cannot load the required root file: '+repr(root_filename)
message = 'Cannot load the required root file: ' + repr(root_filename)
raise tuf.RepositoryError(message)
# Load 'timestamp.json'. A Timestamp role file without a version number is
@ -1542,7 +1542,7 @@ def generate_targets_metadata(targets_directory, target_files, version,
# Note: join() discards 'targets_directory' if 'target' contains a leading
# path separator (i.e., is treated as an absolute path).
target_path = os.path.join(targets_directory, target.lstrip(os.sep))
# Ensure all target files listed in 'target_files' exist. If just one of
# these files does not exist, raise an exception.
if not os.path.exists(target_path):
@ -1711,7 +1711,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
def generate_timestamp_metadata(snapshot_filename, version,
expiration_date, compressions=()):
expiration_date):
"""
<Purpose>
Generate the timestamp metadata object. The 'snapshot.json' file must
@ -1755,7 +1755,6 @@ def generate_timestamp_metadata(snapshot_filename, version,
tuf.formats.PATH_SCHEMA.check_match(snapshot_filename)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
tuf.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compressions)
# Retrieve the versioninfo of the Snapshot metadata file.
versioninfo = {}
@ -1764,8 +1763,8 @@ def generate_timestamp_metadata(snapshot_filename, version,
# Save the versioninfo of the compressed versions of 'timestamp.json'
# in 'versioninfo'. Log the files included in 'fileinfo'.
# TODO: Since version numbers are now stored, the version numbers of
# compressed roles do not change and can thus be excluded. Remove this
# after testing.
# compressed roles do not change and can thus be excluded. Remove the
# following commented lines after testing.
"""
for file_extension in compressions:
if not len(file_extension):

View file

@ -197,10 +197,10 @@ def write(self, write_partial=False, consistent_snapshot=False,
consistent_snapshot:
A boolean indicating whether written metadata and target files should
include a digest in the filename (i.e., <digest>.root.json,
<digest>.targets.json.gz, <digest>.README.json, where <digest> is the
file's SHA256 digest. Example:
1f4e35a60c8f96d439e27e858ce2869c770c1cdd54e1ef76657ceaaf01da18a3.root.json'
include a version number in the filename (i.e.,
<version_number>.root.json, <version_number>.targets.json.gz,
<version_number>.README.json, where <version_number> is the file's
SHA256 digest. Example: 13.root.json'
compressions:
A list of compression algorithms. Each of these algorithms will be
@ -271,7 +271,7 @@ def write(self, write_partial=False, consistent_snapshot=False,
repo_lib._generate_and_write_metadata('root', root_filename, write_partial,
self._targets_directory,
self._metadata_directory,
consistent_snapshot, compressions)
consistent_snapshot)
# Generate the 'targets.json' metadata file.
targets_filename = repo_lib.TARGETS_FILENAME
@ -2704,9 +2704,9 @@ def load_repository(repository_directory):
filenames = repo_lib.get_metadata_filenames(metadata_directory)
# The Root file is always available without a consistent snapshots digest
# attached to the filename. Store the 'consistent_snapshot' value read the
# loaded Root file so that other metadata files may be located.
# The Root file is always available without a consistent snapshots version
# number attached to the filename. Store the 'consistent_snapshot' value
# read the loaded Root file so that other metadata files may be located.
# 'consistent_snapshot' value.
consistent_snapshot = False
@ -2747,10 +2747,10 @@ def load_repository(repository_directory):
else:
continue
# Keep a store metadata previously loaded metadata to prevent
# re-loading duplicate versions. Duplicate versions may occur with
# consistent_snapshot, where the same metadata may be available in
# 'consistent_snapshot', where the same metadata may be available in
# multiples files (the different hash is included in each filename.
if metadata_name in loaded_metadata:
continue
@ -2766,6 +2766,8 @@ def load_repository(repository_directory):
# Extract the metadata attributes 'metadata_name' and update its
# corresponding roleinfo.
# TODO: Test for detection of the Targets role.
roleinfo = tuf.roledb.get_roleinfo(metadata_name)
roleinfo['signatures'].extend(signable['signatures'])
roleinfo['version'] = metadata_object['version']