[WIP] Refactor test_updater.py

Continue refactoring the test cases of test_updater.py.
Fix issue where repository_tool.py was not writing new compressed metadata.
Minor edits to TUF modules.
This commit is contained in:
Vladimir Diaz 2014-03-13 12:31:36 -04:00
parent 5e7713a93f
commit eaee52e14e
6 changed files with 267 additions and 233 deletions

View file

@ -9,28 +9,29 @@
<Started>
October 15, 2012.
March 11, 2014. Refactored to avoid mocking, and to use exact repositories
and realistic retrieval of files. -vladimir.v.diaz
March 11, 2014. Refactored to avoid mocking, use exact repositories, and
add realistic retrieval of files. -vladimir.v.diaz
<Copyright>
See LICENSE for licensing information.
<Purpose>
test_updater.py provides a collection of methods that test all the methods
and functions of 'tuf.client.updater.py'.
'test_updater.py' provides a collection of methods that test the public
and non-public methods and functions and functions of 'tuf.client.updater.py'.
The 'unittest_toolbox.py' module was created to provide additional testing
tools. For more info see 'unittest_toolbox.py'.
tools, such as automatically deleting temporary files created in test cases.
For more information, see 'tuf/tests/unittest_toolbox.py'.
<Methodology>
Test cases here should follow a specific order (i.e., independent methods are
tested prior to dependent methods). More accurately, least dependent methods
tested before dependent methods). More accurately, least dependent methods
are tested before most dependent methods. There is no reason to rewrite or
construct other methods that replicate already-tested methods solely for
testing purposes. This is possible because the 'unittest.TestCase' class
guarantees the order of unit tests. The 'test_something_A' method would
be tested before 'test_something_B'. To ensure the expected order of tests,
a number is be placed after 'test' and before methods name like so:
a number is placed after 'test' and before methods name like so:
'test_1_check_directory'. The number is a measure of dependence, where 1 is
less dependent than 2.
"""
@ -305,7 +306,6 @@ def test_1__init__exceptions(self):
# Restore the client's 'root.json file.
shutil.move(backup_root_file, client_root_file)
# Test: Normal 'tuf.client.updater.Updater' instantiation.
updater.Updater('test_repository', self.repository_mirrors)
@ -538,13 +538,10 @@ def test_2__ensure_not_expired(self):
def test_3__update_metadata(self):
"""
_update_metadata() downloads, verifies, and installs the specified metadata
role. Remove knowledge of currently installed metadata and verify that
they are re-installed after calling _update_metadata().
"""
# Setup
# Setup
# _update_metadata() downloads, verifies, and installs the specified
# metadata role. Remove knowledge of currently installed metadata and
# verify that they are re-installed after calling _update_metadata().
# Remove the installed metadata. _update_metadata() will be called to
# ensure the removed metadata is properly re-installed.
@ -738,58 +735,75 @@ def test_3__update_metadata_if_changed(self):
def test_3__targets_of_role(self):
# Setup
targets_dir_content = os.listdir(self.targets_dir)
# Test: normal case.
targets_list = self.Repository._targets_of_role('targets')
# Setup.
# Extract the list of targets from 'targets.json', to be compared to what
# is returned by _targets_of_role('targets').
targets_in_metadata = \
self.repository_updater.metadata['current']['targets']['targets']
# Verify that list of targets was returned,
# and that it contains valid target file.
# Test: normal case.
targets_list = self.repository_updater._targets_of_role('targets')
# Verify that the list of targets was returned, and that it contains valid
# target files.
self.assertTrue(tuf.formats.TARGETFILES_SCHEMA.matches(targets_list))
targets_filepaths = []
for target in range(len(targets_list)):
targets_filepaths.append(targets_list[target]['filepath'])
for dir_target in targets_dir_content:
if dir_target.endswith('.json'):
self.assertTrue(dir_target in targets_filepaths)
for target in targets_list:
self.assertTrue((target['filepath'], target['fileinfo']) in targets_in_metadata.items())
def test_4_refresh(self):
# This unit test is based on adding an extra target file to the
# server and rebuilding all server-side metadata. All top-level metadata
# should be updated when the client calls refresh().
repository = repo_tool.load_repository(self.repository_directory)
target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt')
repository.targets.add_target(target3)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.write()
# This unit test is based on adding an extra target file to the
# server and rebuilding all server-side metadata. When 'refresh'
# function is called by the client all top level metadata should
# be updated.
target_fullpath = self._add_file_to_directory(self.targets_dir)
target_relpath = os.path.split(target_fullpath)
# Reference 'self.Repository.metadata['current']['targets']'.
targets_meta = self.Repository.metadata['current']['targets']
self.assertFalse(target_relpath[1] in targets_meta['targets'].keys())
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Rebuild metadata at the server side.
self._mock_download_url_to_tempfileobj(self.all_role_paths)
setup.build_server_repository(self.server_repo_dir, self.targets_dir)
# Reference 'self.Repository.metadata['current']['targets']'. Ensure
# 'target3' is not already specified.
targets_metadata = self.repository_updater.metadata['current']['targets']
self.assertFalse(target3 in targets_metadata['targets'].keys())
# Test: normal case.
self.Repository.refresh()
# Verify the expected version numbers of the roles to be modified.
self.assertTrue(self.repository_updater.metadata['current']['targets']\
['version'], 1)
self.assertTrue(self.repository_updater.metadata['current']['snapshot']\
['version'], 1)
self.assertTrue(self.repository_updater.metadata['current']['timestamp']\
['version'], 1)
# Verify that clients metadata was updated.
targets_meta = self.Repository.metadata['current']['targets']
self.assertTrue(target_relpath[1] in targets_meta['targets'].keys())
# Test: normal case. 'targes.json' should now specify 'target3', and the
# following top-level metadata should have also been updated:
# 'snapshot.json' and 'timestamp.json'.
self.repository_updater.refresh()
# Verify that the client's metadata was updated.
targets_metadata = self.repository_updater.metadata['current']['targets']
targets_directory = os.path.join(self.repository_directory, 'targets')
target3 = target3[len(targets_directory):]
self.assertTrue(target3 in targets_metadata['targets'].keys())
# Verify the expected version numbers of the updated roles.
self.assertTrue(self.repository_updater.metadata['current']['targets']\
['version'], 2)
self.assertTrue(self.repository_updater.metadata['current']['snapshot']\
['version'], 2)
self.assertTrue(self.repository_updater.metadata['current']['timestamp']\
['version'], 2)
# Restore server's repository to initial state.
self._remove_filepath(target_fullpath)
# Rebuild metadata at the server side.
self._mock_download_url_to_tempfileobj(self.all_role_paths)
setup.build_server_repository(self.server_repo_dir, self.targets_dir)
@ -844,96 +858,108 @@ def test_4__refresh_targets_metadata(self):
self.assertTrue(deleg_target_file2 in targets_list)
# Clean up.
self._remove_filepath(deleg_target_filepath2)
shutil.rmtree(os.path.join(self.server_repo_dir, 'metadata'))
shutil.rmtree(os.path.join(self.server_repo_dir, 'keystore'))
setup.build_server_repository(self.server_repo_dir, self.targets_dir)
def test_5_all_targets(self):
# As with '_refresh_targets_metadata()', tuf.roledb._roledb_dict
# has to be populated. The 'tuf.download.safe_download' method
# should be patched. The 'self.all_role_paths' argument is passed so that
# the top-level roles and delegations may be all "downloaded" when
# Repository.refresh() is called below. '_mock_download_url_to_tempfileobj'
# returns each filepath listed in 'self.all_role_paths' in the listed
# order.
self._mock_download_url_to_tempfileobj(self.all_role_paths)
setup.build_server_repository(self.server_repo_dir, self.targets_dir)
# Update top-level metadata.
self.Repository.refresh()
# Setup
# As with '_refresh_targets_metadata()',
# Update top-level metadata before calling one of the "targets" methods, as
# recommended by 'updater.py'.
self.repository_updater.refresh()
# Test: normal case.
all_targets = self.Repository.all_targets()
all_targets = self.repository_updater.all_targets()
# Verify format of 'all_targets', it should correspond to
# 'TARGETFILES_SCHEMA'.
# Verify format of 'all_targets', it should correspond to
# 'TARGETFILES_SCHEMA'.
self.assertTrue(tuf.formats.TARGETFILES_SCHEMA.matches(all_targets))
# Verify that there is a correct number of records in 'all_targets' list.
# On the repository there are 4 target files, 2 of which are delegated.
# The targets role lists all targets, for a total of 4. The two delegated
# roles each list 1 of the already listed targets in 'targets.json', for a
# total of 2 (the delegated targets are listed twice). The total number of
# targets in 'all_targets' should then be 6.
self.assertTrue(len(all_targets) is 6)
# Verify that there is a correct number of records in 'all_targets' list,
# and the expected filepaths specified in the metadata. On the targets
# directory of the repository, there should be 3 target files (2 of
# which are specified by 'targets.json'.) The delegated role 'targets/role1'
# specifies 1 target file. The expected total number targets in
# 'all_targets' should be 3.
self.assertTrue(len(all_targets) is 3)
target_filepaths = []
for target in all_targets:
target_filepaths.append(target['filepath'])
self.assertTrue('/file1.txt' in target_filepaths)
self.assertTrue('/file2.txt' in target_filepaths)
self.assertTrue('/file3.txt' in target_filepaths)
def test_5_targets_of_role(self):
# Setup
targets_dir_content = os.listdir(self.targets_dir)
# Remove knowledge of 'targets.json' from the metadata store.
self.repository_updater.metadata['current']['targets']
# Remove the metadata of the delegated roles.
#shutil.rmtree(os.path.join(self.client_metadata, 'targets'))
os.remove(os.path.join(self.client_metadata_current, 'targets.json'))
# Extract the target files specified by the delegated role, 'role1.json',
# as available on the server-side version of the role.
role1_filepath = os.path.join(self.repository_directory, 'metadata',
'targets', 'role1.json')
role1_signable = tuf.util.load_json_file(role1_filepath)
expected_targets = role1_signable['signed']['targets']
# Test: normal case.
targets_list = self.Repository.targets_of_role()
# Verify that list of targets was returned,
# and that it contains valid target file.
self.assertTrue(tuf.formats.TARGETFILES_SCHEMA.matches(targets_list))
targets_filepaths = []
for target in range(len(targets_list)):
targets_filepaths.append(targets_list[target]['filepath'])
for dir_target in targets_dir_content:
if dir_target.endswith('.json'):
self.assertTrue(dir_target in targets_filepaths)
targets_list = self.repository_updater.targets_of_role('targets/role1')
# Verify that the expected role files were downloaded and installed.
os.path.exists(os.path.join(self.client_metadata_current, 'targets.json'))
os.path.exists(os.path.join(self.client_metadata_current, 'targets',
'role1.json'))
self.assertTrue('targets' in self.repository_updater.metadata['current'])
self.assertTrue('targets/role1' in self.repository_updater.metadata['current'])
# Verify that list of targets was returned and that it contains valid
# target files.
self.assertTrue(tuf.formats.TARGETFILES_SCHEMA.matches(targets_list))
for target in targets_list:
self.assertTrue((target['filepath'], target['fileinfo']) in expected_targets.items())
# Test: Invalid arguments.
# targets_of_role() expected a string rolename.
self.assertRaises(tuf.FormatError, self.repository_updater.targets_of_role,
8)
self.assertRaises(tuf.UnknownRoleError, self.repository_updater.targets_of_role,
'unknown_rolename')
def test_6_target(self):
# Requirements: make sure roledb_dict is populated and
# tuf.download.safe_download function is patched.
# Setup
targets_dir_content = os.listdir(self.targets_dir)
# Reference 'self.Repository.metadata['current']['targets']['targets']
targets_field = self.Repository.metadata['current']['targets']['targets']
# Reference 'self.Repository.target' function.
target = self.Repository.target
# Test: normal case.
for _target in targets_dir_content:
if _target.endswith('.json'):
target_info = target(_target)
# Verify that 'target_info' corresponds to 'TARGETFILE_SCHEMA'.
self.assertTrue(tuf.formats.TARGETFILE_SCHEMA.matches(target_info))
# Extract the file information of the targets specified in 'targets.json'.
self.repository_updater.refresh()
targets_metadata = self.repository_updater.metadata['current']['targets']
target_files = targets_metadata['targets']
# Extract random target from 'target_files', which will be compared to what
# is returned by target(). Restore the popped target (dict value stored in
# the metadata store) so that it can be found later.
filepath, fileinfo = target_files.popitem()
target_files[filepath] = fileinfo
target_fileinfo = self.repository_updater.target(filepath)
self.assertTrue(tuf.formats.TARGETFILE_SCHEMA.matches(target_fileinfo))
self.assertEqual(target_fileinfo['filepath'], filepath)
self.assertEqual(target_fileinfo['fileinfo'], fileinfo)
# Test: invalid target path.
self.assertRaises(tuf.UnknownTargetError, target, self.random_path())
self.assertRaises(tuf.UnknownTargetError, self.repository_updater.target,
self.random_path())
@ -941,14 +967,13 @@ def test_6_target(self):
def test_6_download_target(self):
# 'tuf.download.safe_download' method should be patched.
target_rel_paths_src = self._get_list_of_target_paths(self.targets_dir)
# Create temporary directory that will be passed as an argument to the
# 'download_target' function as a targets destination directory.
dest_dir = self.make_temp_directory()
# Create temporary directory (destination directory of downloaded targets)
# that will be passed as an argument to the 'download_target()'.
destination_directory = self.make_temp_directory()
target_files = \
self.repository_updater.metadata['current']['targets']['targets']
test_target_file = target_files.pop()
# Test: normal case.
for file_path in target_rel_paths_src:

View file

@ -351,10 +351,11 @@ def __str__(self):
def _load_metadata_from_file(self, metadata_set, metadata_role):
"""
<Purpose>
Load current or previous metadata if there is a local file. If the
expected file belonging to 'metadata_role' (e.g., 'root.json') cannot
be loaded, raise an exception. The extracted metadata object loaded
from file is saved to the metadata store (i.e., self.metadata).
Non-public method that loads current or previous metadata if there is a
local file. If the expected file belonging to 'metadata_role' (e.g.,
'root.json') cannot be loaded, raise an exception. The extracted metadata
object loaded from file is saved to the metadata store (i.e.,
self.metadata).
<Arguments>
metadata_set:
@ -427,10 +428,10 @@ def _load_metadata_from_file(self, metadata_set, metadata_role):
def _rebuild_key_and_role_db(self):
"""
<Purpose>
Rebuild the key and role databases from the currently trusted
'root' metadata object extracted from 'root.json'. This private
method is called when a new/updated 'root' metadata file is loaded.
This method will only store the role information for the top-level
Non-public method that rebuilds the key and role databases from the
currently trusted 'root' metadata object extracted from 'root.json'. This
private method is called when a new/updated 'root' metadata file is
loaded. This method will only store the role information of the top-level
roles (i.e., 'root', 'targets', 'snapshot', 'timestamp').
<Arguments>
@ -468,7 +469,7 @@ def _rebuild_key_and_role_db(self):
def _import_delegations(self, parent_role):
"""
<Purpose>
Import all the roles delegated by 'parent_role'.
Non-public method that imports all the roles delegated by 'parent_role'.
<Arguments>
parent_role:
@ -570,8 +571,8 @@ def refresh(self, unsafely_update_root_if_necessary=True):
If any metadata has expired.
<Side Effects>
Updates the metadata files for the top-level roles with the
latest information.
Updates the metadata files of the top-level roles with the latest
information.
<Returns>
None.
@ -641,11 +642,11 @@ def refresh(self, unsafely_update_root_if_necessary=True):
def _check_hashes(self, file_object, trusted_hashes):
"""
<Purpose>
A private helper method that verifies multiple secure hashes of the
downloaded file 'file_object'. If any of these fail it raises an
exception. This is to conform with the TUF spec, which support clients
with different hashing algorithms. The 'hash.py' module is used to compute
the hashes of 'file_object'.
Non-public method that verifies multiple secure hashes of the downloaded
file 'file_object'. If any of these fail it raises an exception. This is
to conform with the TUF spec, which support clients with different hashing
algorithms. The 'hash.py' module is used to compute the hashes of
'file_object'.
<Arguments>
file_object:
@ -687,9 +688,9 @@ def _check_hashes(self, file_object, trusted_hashes):
def _hard_check_file_length(self, file_object, trusted_file_length):
"""
<Purpose>
A private helper method that ensures the length of 'file_object' is
strictly equal to 'trusted_file_length'. This is a deliberately
redundant implementation designed to complement
Non-public method that ensures the length of 'file_object' is strictly
equal to 'trusted_file_length'. This is a deliberately redundant
implementation designed to complement
tuf.download._check_downloaded_length().
<Arguments>
@ -733,7 +734,7 @@ def _hard_check_file_length(self, file_object, trusted_file_length):
def _soft_check_file_length(self, file_object, trusted_file_length):
"""
<Purpose>
A private helper method that checks the trusted file length of a
Non-public method that checks the trusted file length of a
'tuf.util.TempFile' file-like object. The length of the file must be less
than or equal to the expected length. This is a deliberately redundant
implementation designed to complement
@ -779,9 +780,9 @@ def _soft_check_file_length(self, file_object, trusted_file_length):
def _get_target_file(self, target_filepath, file_length, file_hashes):
"""
<Purpose>
Safely (i.e., the file length and hash are strictly equal to the
trusted) download a target file up to a certain length, and check its
hashes thereafter.
Non-public method that safely (i.e., the file length and hash are strictly
equal to the trusted) downloads a target file up to a certain length, and
checks its hashes thereafter.
<Arguments>
target_filepath:
@ -839,8 +840,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
metadata_role):
"""
<Purpose>
A private helper function to verify an uncompressed metadata
file.
Non-public method that verifies an uncompressed metadata file.
<Arguments>
metadata_file_object:
@ -929,9 +929,9 @@ def _unsafely_get_metadata_file(self, metadata_role, metadata_filepath,
"""
<Purpose>
Unsafely download a metadata file up to a certain length. The actual file
length may not be strictly equal to its expected length. File hashes will
not be checked because it is expected to be unknown.
Non-public method that downloads a metadata file up to a certain length.
The actual file length may not be strictly equal to its expected length.
File hashes will not be checked because it is expected to be unknown.
<Arguments>
metadata_role:
@ -1012,8 +1012,8 @@ def _safely_get_metadata_file(self, metadata_role, metadata_filepath,
compression=None, compressed_fileinfo=None):
"""
<Purpose>
Safely download a metadata file up to a certain length, and check its
hashes thereafter.
Non-public method that safely downloads a metadata file up to a certain
length, and checks its hashes thereafter.
<Arguments>
metadata_role:
@ -1095,9 +1095,9 @@ def _get_file(self, filepath, verify_file_function, file_type,
verify_compressed_file_function=None, download_safely=True):
"""
<Purpose>
Try downloading, up to a certain length, a metadata or target file from a
list of known mirrors. As soon as the first valid copy of the file is
found, the rest of the mirrors will be skipped.
Non-public method that tries downloading, up to a certain length, a
metadata or target file from a list of known mirrors. As soon as the first
valid copy of the file is found, the rest of the mirrors will be skipped.
<Arguments>
filepath:
@ -1196,11 +1196,11 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
compression=None, compressed_fileinfo=None):
"""
<Purpose>
Download, verify, and 'install' the metadata belonging to 'metadata_role'.
Calling this method implies the metadata has been updated by the
repository and thus needs to be re-downloaded. The current and previous
metadata stores are updated if the newly downloaded metadata is
successfully downloaded and verified.
Non-public method that downloads, verifies, and 'installs' the metadata
belonging to 'metadata_role'. Calling this method implies the metadata
has been updated by the repository and thus needs to be re-downloaded.
The current and previous metadata stores are updated if the newly
downloaded metadata is successfully downloaded and verified.
<Arguments>
metadata_role:
@ -1362,15 +1362,17 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
def _update_metadata_if_changed(self, metadata_role, referenced_metadata='snapshot'):
def _update_metadata_if_changed(self, metadata_role,
referenced_metadata='snapshot'):
"""
<Purpose>
Update the metadata for 'metadata_role' if it has changed. With the
exception of the 'timestamp' role, all the top-level roles are updated
by this method. The 'timestamp' role is always downloaded from a mirror
without first checking if it has been updated; it is updated in refresh()
by calling _update_metadata('timestamp'). This method is also called for
delegated role metadata, which are referenced by 'snapshot'.
Non-public method that updates the metadata for 'metadata_role' if it has
changed. With the exception of the 'timestamp' role, all the top-level
roles are updated by this method. The 'timestamp' role is always
downloaded from a mirror without first checking if it has been updated; it
is updated in refresh() by calling _update_metadata('timestamp'). This
method is also called for delegated role metadata, which are referenced by
'snapshot'.
If the metadata needs to be updated but an update cannot be obtained,
this method will delete the file (with the exception of the root
@ -1520,14 +1522,14 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='snapsh
def _fileinfo_has_changed(self, metadata_filename, new_fileinfo):
"""
<Purpose>
Determine whether the current fileinfo of 'metadata_filename'
differs from 'new_fileinfo'. The 'new_fileinfo' argument
should be extracted from the latest copy of the metadata
that references 'metadata_filename'. Example: 'root.json'
would be referenced by 'snapshot.json'.
Non-public method that determines whether the current fileinfo of
'metadata_filename' differs from 'new_fileinfo'. The 'new_fileinfo'
argument should be extracted from the latest copy of the metadata that
references 'metadata_filename'. Example: 'root.json' would be referenced
by 'snapshot.json'.
'new_fileinfo' should only be 'None' if this is for updating
'root.json' without having 'snapshot.json' available.
'new_fileinfo' should only be 'None' if this is for updating 'root.json'
without having 'snapshot.json' available.
<Arguments>
metadadata_filename:
@ -1592,10 +1594,11 @@ def _fileinfo_has_changed(self, metadata_filename, new_fileinfo):
def _update_fileinfo(self, metadata_filename):
"""
<Purpose>
Update the 'self.fileinfo' entry for the metadata belonging to
'metadata_filename'. If the 'current' metadata for 'metadata_filename'
cannot be loaded, set its fileinfo' to 'None' to signal that
it is not in the 'self.fileinfo' AND it also doesn't exist locally.
Non-public method that updates the 'self.fileinfo' entry for the metadata
belonging to 'metadata_filename'. If the 'current' metadata for
'metadata_filename' cannot be loaded, set its fileinfo' to 'None' to
signal that it is not in the 'self.fileinfo' AND it also doesn't exist
locally.
<Arguments>
metadata_filename:
@ -1638,8 +1641,8 @@ def _update_fileinfo(self, metadata_filename):
def _move_current_to_previous(self, metadata_role):
"""
<Purpose>
Move the current metadata file for 'metadata_role' to the previous
directory.
Non-public method that moves the current metadata file for 'metadata_role'
to the previous directory.
<Arguments>
metadata_role:
@ -1680,8 +1683,8 @@ def _move_current_to_previous(self, metadata_role):
def _delete_metadata(self, metadata_role):
"""
<Purpose>
Remove all (current) knowledge of 'metadata_role'. The metadata
belonging to 'metadata_role' is removed from the current
Non-public method that removes all (current) knowledge of 'metadata_role'.
The metadata belonging to 'metadata_role' is removed from the current
'self.metadata' store and from the role database. The 'root.json' role
file is never removed.
@ -1720,7 +1723,8 @@ def _delete_metadata(self, metadata_role):
def _ensure_not_expired(self, metadata_role):
"""
<Purpose>
Raise an exception if the current specified metadata has expired.
Non-public method that raises an exception if the current specified
metadata has expired.
<Arguments>
metadata_role:
@ -1819,28 +1823,28 @@ def all_targets(self):
def _refresh_targets_metadata(self, rolename='targets', include_delegations=False):
"""
<Purpose>
Refresh the targets metadata of 'rolename'. If 'include_delegations'
is True, include all the delegations that follow 'rolename'. The metadata
for the 'targets' role is updated in refresh() by the
_update_metadata_if_changed('targets') call, not here. Delegated roles
are not loaded when the repository is first initialized. They are loaded
from disk, updated if they have changed, and stored to the 'self.metadata'
store by this method. This method is called by the target methods,
like all_targets() and targets_of_role().
Non-public method that refreshes the targets metadata of 'rolename'. If
'include_delegations' is True, include all the delegations that follow
'rolename'. The metadata for the 'targets' role is updated in refresh()
by the _update_metadata_if_changed('targets') call, not here. Delegated
roles are not loaded when the repository is first initialized. They are
loaded from disk, updated if they have changed, and stored to the
'self.metadata' store by this method. This method is called by the target
methods, like all_targets() and targets_of_role().
<Arguments>
rolename:
This is a delegated role name and should not end
in '.json'. Example: 'targets/linux/x86'.
This is a delegated role name and should not end in '.json'. Example:
targets/linux/x86'.
include_delegations:
Boolean indicating if the delegated roles set by 'rolename' should
be refreshed.
Boolean indicating if the delegated roles set by 'rolename' should be
refreshed.
<Exceptions>
tuf.RepositoryError:
If the metadata file for the 'targets' role is missing
from the 'snapshot' metadata.
If the metadata file for the 'targets' role is missing from the
'snapshot' metadata.
<Side Effects>
The metadata for the delegated roles are loaded and updated if they
@ -1853,8 +1857,8 @@ def _refresh_targets_metadata(self, rolename='targets', include_delegations=Fals
roles_to_update = []
# See if this role provides metadata and, if we're including
# delegations, look for metadata from delegated roles.
# See if this role provides metadata and, if we're including delegations,
# look for metadata from delegated roles.
role_prefix = rolename + '/'
for metadata_path in self.metadata['current']['snapshot']['meta'].keys():
if metadata_path == rolename + '.json':
@ -1894,6 +1898,7 @@ def _refresh_targets_metadata(self, rolename='targets', include_delegations=Fals
# Remove the role if it has expired.
try:
self._ensure_not_expired(rolename)
except tuf.ExpiredMetadataError:
tuf.roledb.remove_role(rolename)
@ -2031,9 +2036,9 @@ def refresh_targets_metadata_chain(self, rolename):
def _targets_of_role(self, rolename, targets=None, skip_refresh=False):
"""
<Purpose>
Return the target information for all the targets of 'rolename'.
The returned information is a list conformant to
'tuf.formats.TARGETFILES_SCHEMA' and has the form:
Non-public method that returns the target information of all the targets
of 'rolename'. The returned information is a list conformant to
'tuf.formats.TARGETFILES_SCHEMA', and has the form:
[{'filepath': 'a/b/c.txt',
'fileinfo': {'length': 13323,
@ -2042,8 +2047,8 @@ def _targets_of_role(self, rolename, targets=None, skip_refresh=False):
<Arguments>
rolename:
This is a role name and should not end
in '.json'. Examples: 'targets', 'targets/linux/x86'.
This is a role name and should not end in '.json'. Examples: 'targets',
'targets/linux/x86'.
targets:
A list of targets containing target information, conformant to
@ -2103,7 +2108,7 @@ def targets_of_role(self, rolename='targets'):
<Purpose>
Return a list of trusted targets directly specified by 'rolename'.
The returned information is a list conformant to
tuf.formats.TARGETFILES_SCHEMA and has the form:
'tuf.formats.TARGETFILES_SCHEMA', and has the form:
[{'filepath': 'a/b/c.txt',
'fileinfo': {'length': 13323,
@ -2124,13 +2129,13 @@ def targets_of_role(self, rolename='targets'):
If 'rolename' is improperly formatted.
tuf.RepositoryError:
If the metadata of 'rolename' could not be updated.
If the metadata of 'rolename' cannot be updated.
tuf.UnknownRoleError:
If 'rolename' is not found in the role database.
<Side Effects>
The metadata for updated delegated roles are downloaded and stored.
The metadata of updated delegated roles are downloaded and stored.
<Returns>
A list of targets, conformant to 'tuf.formats.TARGETFILES_SCHEMA'.
@ -2140,6 +2145,9 @@ def targets_of_role(self, rolename='targets'):
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.RELPATH_SCHEMA.check_match(rolename)
if not tuf.roledb.role_exists(rolename):
raise tuf.UnknownRoleError(rolename)
self.refresh_targets_metadata_chain(rolename)
self._refresh_targets_metadata(rolename)
@ -2152,14 +2160,13 @@ def targets_of_role(self, rolename='targets'):
def target(self, target_filepath):
"""
<Purpose>
Return the target file information of 'target_filepath' and update
its corresponding metadata, if necessary.
Return the target file information of 'target_filepath', and update its
corresponding metadata, if necessary.
<Arguments>
target_filepath:
The path to the target file on the repository. This
will be relative to the 'targets' (or equivalent) directory
on a given mirror.
The path to the target file on the repository. This will be relative to
the 'targets' (or equivalent) directory on a given mirror.
<Exceptions>
tuf.FormatError:
@ -2197,6 +2204,7 @@ def target(self, target_filepath):
message = target_filepath+' not found.'
logger.error(message)
raise tuf.UnknownTargetError(message)
# Otherwise, return the found target.
else:
return target
@ -2208,9 +2216,9 @@ def target(self, target_filepath):
def _preorder_depth_first_walk(self, target_filepath):
"""
<Purpose>
Interrogate the tree of target delegations in order of appearance (which
implicitly order trustworthiness), and return the matching target
found in the most trusted role.
Non-public method that interrogates the tree of target delegations in
order of appearance (which implicitly order trustworthiness), and return
the matching target found in the most trusted role.
<Arguments>
target_filepath:
@ -2288,8 +2296,8 @@ def _preorder_depth_first_walk(self, target_filepath):
def _get_target_from_targets_role(self, role_name, targets, target_filepath):
"""
<Purpose>
Determine whether the targets role with the given 'role_name' has the
target with the name 'target_filepath'.
Non-public method that determines whether the targets role with the given
'role_name' has the target with the name 'target_filepath'.
<Arguments>
role_name:
@ -2336,8 +2344,8 @@ def _get_target_from_targets_role(self, role_name, targets, target_filepath):
def _visit_child_role(self, child_role, target_filepath):
"""
<Purpose>
Determine whether the given 'child_role' has been delegated the target
with the name 'target_filepath'.
Non-public method that determines whether the given 'child_role' has been
delegated the target with the name 'target_filepath'.
Ensure that we explore only delegated roles trusted with the target. We
assume conservation of delegated paths in the complete tree of
@ -2418,9 +2426,10 @@ def _visit_child_role(self, child_role, target_filepath):
def _get_target_hash(self, target_filepath, hash_function='sha256'):
"""
<Purpose>
Compute the hash of 'target_filepath'. This is useful in conjunction with
the "path_hash_prefixes" attribute in a delegated targets role, which
tells us which paths it is implicitly responsible for.
Non-public method that computes the hash of 'target_filepath'. This is
useful in conjunction with the "path_hash_prefixes" attribute in a
delegated targets role, which tells us which paths it is implicitly
responsible for.
<Arguments>
target_filepath:

View file

@ -30,7 +30,7 @@
is a Python binding to the NaCl library and is faster than the pure python
implementation. Verifying signatures can take approximately 0.0009 seconds.
PyNaCl relies on the libsodium C library. PyNaCl is required for key and
signature generation. Verifying signatures may be done in the pure Python.
signature generation. Verifying signatures may be done in pure Python.
https://github.com/pyca/pynacl
https://github.com/jedisct1/libsodium

View file

@ -30,6 +30,7 @@
tuf.formats.<SCHEMA>.matches(object)
Example:
rsa_key = {'keytype': 'rsa'
'keyid': 34892fc465ac76bc3232fab
'keyval': {'public': 'public_key',

View file

@ -1213,7 +1213,7 @@ def compressions(self):
'tuf.formats.COMPRESSIONS_SCHEMA'.
"""
tuf.roledb.get_roleinfo(self.rolename)
roleinfo = tuf.roledb.get_roleinfo(self.rolename)
compressions = roleinfo['compressions']
return compressions
@ -4449,9 +4449,9 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
# Generate the compressed versions of 'metadata', if necessary. A compressed
# file may be written (without needed to write the uncompressed version) if
# the repository maintainer adds compression after writting the the
# uncompressed version.
# file may be written (without needing to write the uncompressed version) if
# the repository maintainer adds compression after writing the uncompressed
# version.
for compression in compressions:
file_object = None
@ -4474,10 +4474,10 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
raise tuf.FormatError('Unknown compression algorithm: '+repr(compression))
# Save the compressed version, ensuring an unchanged file is not re-saved.
# Re-savign the same compressed version may cause its digest to unexpectedly
# Re-saving the same compressed version may cause its digest to unexpectedly
# change (gzip includes a timestamp) even though content has not changed.
_write_compressed_metadata(file_object, compressed_filename,
consistent_snapshot)
write_new_metadata, consistent_snapshot)
return written_filename
@ -4485,7 +4485,7 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
def _write_compressed_metadata(file_object, compressed_filename,
consistent_snapshot):
write_new_metadata, consistent_snapshot):
"""
Write compressed versions of metadata, ensuring compressed file that have
not changed are not re-written, the digest of the compressed file is properly
@ -4497,7 +4497,7 @@ def _write_compressed_metadata(file_object, compressed_filename,
# If a consistent snapshot is unneeded, 'file_object' may be simply moved
# 'compressed_filename' if not already written.
if not consistent_snapshot:
if not os.path.exists(compressed_filename):
if not os.path.exists(compressed_filename) or write_new_metadata:
file_object.move(compressed_filename)
# The temporary file must be closed if 'file_object.move()' is not used.

View file

@ -18,7 +18,6 @@
TempFile class that generates a file-like object for temporary storage, etc.
"""
import os
import sys
import gzip