mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #1463 from sechkova/review-preorder-dfs
ngclient: review preorder dfs code
This commit is contained in:
commit
55c8a7836e
3 changed files with 100 additions and 180 deletions
|
|
@ -242,10 +242,9 @@ def test_snapshot_serialization(self, test_case_data: str):
|
|||
"no path attribute":
|
||||
'{"keyids": ["keyid"], "name": "a", "terminating": false, \
|
||||
"path_hash_prefixes": ["h1", "h2"], "threshold": 99}',
|
||||
"no hash or path prefix":
|
||||
'{"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3}',
|
||||
"unrecognized field":
|
||||
'{"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3, "foo": "bar"}',
|
||||
'{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \
|
||||
"terminating": true, "threshold": 3, "foo": "bar"}',
|
||||
}
|
||||
|
||||
@run_sub_tests_with_dataset(valid_delegated_roles)
|
||||
|
|
@ -255,12 +254,27 @@ def test_delegated_role_serialization(self, test_case_data: str):
|
|||
self.assertDictEqual(case_dict, deserialized_role.to_dict())
|
||||
|
||||
|
||||
invalid_delegated_roles: DataSet = {
|
||||
"missing hash prefixes and paths":
|
||||
'{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}',
|
||||
"both hash prefixes and paths":
|
||||
'{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \
|
||||
"paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}',
|
||||
}
|
||||
|
||||
@run_sub_tests_with_dataset(invalid_delegated_roles)
|
||||
def test_invalid_delegated_role_serialization(self, test_case_data: str):
|
||||
case_dict = json.loads(test_case_data)
|
||||
with self.assertRaises(ValueError):
|
||||
DelegatedRole.from_dict(copy.copy(case_dict))
|
||||
|
||||
|
||||
valid_delegations: DataSet = {
|
||||
"all": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ]}',
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ]}',
|
||||
"unrecognized field":
|
||||
'{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ], \
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ], \
|
||||
"foo": "bar"}',
|
||||
}
|
||||
|
||||
|
|
@ -305,13 +319,13 @@ def test_targetfile_serialization(self, test_case_data: str):
|
|||
"targets": { "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} } }, \
|
||||
"delegations": {"keys": {"keyid" : {"keytype": "rsa", \
|
||||
"scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"} }}, \
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ]} \
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ]} \
|
||||
}',
|
||||
"empty targets": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
|
||||
"targets": {}, \
|
||||
"delegations": {"keys": {"keyid" : {"keytype": "rsa", \
|
||||
"scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"} }}, \
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ]} \
|
||||
"roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ]} \
|
||||
}',
|
||||
"no delegations": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
|
||||
"targets": { "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} } } \
|
||||
|
|
|
|||
|
|
@ -16,8 +16,10 @@
|
|||
|
||||
"""
|
||||
import abc
|
||||
import fnmatch
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime, timedelta
|
||||
|
|
@ -960,12 +962,12 @@ def update(self, rolename: str, role_info: MetaFile) -> None:
|
|||
class DelegatedRole(Role):
|
||||
"""A container with information about a delegated role.
|
||||
|
||||
A delegation can happen in three ways:
|
||||
- paths is None and path_hash_prefixes is None: delegates all targets
|
||||
A delegation can happen in two ways:
|
||||
- paths is set: delegates targets matching any path pattern in paths
|
||||
- path_hash_prefixes is set: delegates targets whose target path hash
|
||||
starts with any of the prefixes in path_hash_prefixes
|
||||
paths and path_hash_prefixes are mutually exclusive: both cannot be set.
|
||||
paths and path_hash_prefixes are mutually exclusive: both cannot be set,
|
||||
at least one of them must be set.
|
||||
|
||||
Attributes:
|
||||
name: A string giving the name of the delegated role.
|
||||
|
|
@ -990,10 +992,11 @@ def __init__(
|
|||
self.name = name
|
||||
self.terminating = terminating
|
||||
if paths is not None and path_hash_prefixes is not None:
|
||||
raise ValueError(
|
||||
"Only one of the attributes 'paths' and"
|
||||
"'path_hash_prefixes' can be set!"
|
||||
)
|
||||
raise ValueError("Either paths or path_hash_prefixes can be set")
|
||||
|
||||
if paths is None and path_hash_prefixes is None:
|
||||
raise ValueError("One of paths or path_hash_prefixes must be set")
|
||||
|
||||
self.paths = paths
|
||||
self.path_hash_prefixes = path_hash_prefixes
|
||||
|
||||
|
|
@ -1031,6 +1034,35 @@ def to_dict(self) -> Dict[str, Any]:
|
|||
res_dict["path_hash_prefixes"] = self.path_hash_prefixes
|
||||
return res_dict
|
||||
|
||||
def is_delegated_path(self, target_filepath: str) -> bool:
|
||||
"""Determines whether the given 'target_filepath' is in one of
|
||||
the paths that DelegatedRole is trusted to provide"""
|
||||
|
||||
if self.path_hash_prefixes is not None:
|
||||
# Calculate the hash of the filepath
|
||||
# to determine in which bin to find the target.
|
||||
digest_object = sslib_hash.digest(algorithm="sha256")
|
||||
digest_object.update(target_filepath.encode("utf-8"))
|
||||
target_filepath_hash = digest_object.hexdigest()
|
||||
|
||||
for path_hash_prefix in self.path_hash_prefixes:
|
||||
if target_filepath_hash.startswith(path_hash_prefix):
|
||||
return True
|
||||
|
||||
elif self.paths is not None:
|
||||
for pathpattern in self.paths:
|
||||
# A delegated role path may be an explicit path or glob
|
||||
# pattern (Unix shell-style wildcards). Explicit filepaths
|
||||
# are also considered matches. Make sure to strip any leading
|
||||
# path separators so that a match is made.
|
||||
# Example: "foo.tgz" should match with "/*.tgz".
|
||||
if fnmatch.fnmatch(
|
||||
target_filepath.lstrip(os.sep), pathpattern.lstrip(os.sep)
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class Delegations:
|
||||
"""A container object storing information about all delegations.
|
||||
|
|
|
|||
|
|
@ -58,16 +58,15 @@
|
|||
updater.download_target(targetinfo, "~/tufclient/downloads/")
|
||||
"""
|
||||
|
||||
import fnmatch
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
from urllib import parse
|
||||
|
||||
from securesystemslib import hash as sslib_hash
|
||||
from securesystemslib import util as sslib_util
|
||||
|
||||
from tuf import exceptions
|
||||
from tuf.api.metadata import Targets
|
||||
from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set
|
||||
from tuf.ngclient.config import UpdaterConfig
|
||||
from tuf.ngclient.fetcher import FetcherInterface
|
||||
|
|
@ -143,7 +142,9 @@ def refresh(self) -> None:
|
|||
self._load_snapshot()
|
||||
self._load_targets("targets", "root")
|
||||
|
||||
def get_one_valid_targetinfo(self, target_path: str) -> Dict:
|
||||
def get_one_valid_targetinfo(
|
||||
self, target_path: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Returns target information for 'target_path'.
|
||||
|
||||
The return value can be used as an argument to
|
||||
|
|
@ -169,6 +170,9 @@ def get_one_valid_targetinfo(self, target_path: str) -> Dict:
|
|||
OSError: New metadata could not be written to disk
|
||||
RepositoryError: Metadata failed to verify in some way
|
||||
TODO: download-related errors
|
||||
|
||||
Returns:
|
||||
A targetinfo dictionary or None
|
||||
"""
|
||||
return self._preorder_depth_first_walk(target_path)
|
||||
|
||||
|
|
@ -368,26 +372,27 @@ def _load_targets(self, role: str, parent_role: str) -> None:
|
|||
self._trusted_set.update_delegated_targets(data, role, parent_role)
|
||||
self._persist_metadata(role, data)
|
||||
|
||||
def _preorder_depth_first_walk(self, target_filepath) -> Dict:
|
||||
def _preorder_depth_first_walk(
|
||||
self, target_filepath: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Interrogates the tree of target delegations in order of appearance
|
||||
(which implicitly order trustworthiness), and returns the matching
|
||||
target found in the most trusted role.
|
||||
"""
|
||||
|
||||
target = None
|
||||
role_names = [("targets", "root")]
|
||||
visited_role_names = set()
|
||||
# List of delegations to be interrogated. A (role, parent role) pair
|
||||
# is needed to load and verify the delegated targets metadata.
|
||||
delegations_to_visit = [("targets", "root")]
|
||||
visited_role_names: Set[Tuple[str, str]] = set()
|
||||
number_of_delegations = self.config.max_delegations
|
||||
|
||||
# Preorder depth-first traversal of the graph of target delegations.
|
||||
while (
|
||||
target is None and number_of_delegations > 0 and len(role_names) > 0
|
||||
):
|
||||
while number_of_delegations > 0 and len(delegations_to_visit) > 0:
|
||||
|
||||
# Pop the role name from the top of the stack.
|
||||
role_name, parent_role = role_names.pop(-1)
|
||||
self._load_targets(role_name, parent_role)
|
||||
role_name, parent_role = delegations_to_visit.pop(-1)
|
||||
|
||||
# Skip any visited current role to prevent cycles.
|
||||
if (role_name, parent_role) in visited_role_names:
|
||||
logger.debug("Skipping visited current role %s", role_name)
|
||||
|
|
@ -395,183 +400,52 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict:
|
|||
|
||||
# The metadata for 'role_name' must be downloaded/updated before
|
||||
# its targets, delegations, and child roles can be inspected.
|
||||
self._load_targets(role_name, parent_role)
|
||||
|
||||
role_metadata = self._trusted_set[role_name].signed
|
||||
role_metadata: Targets = self._trusted_set[role_name].signed
|
||||
target = role_metadata.targets.get(target_filepath)
|
||||
|
||||
if target is not None:
|
||||
logger.debug("Found target in current role %s", role_name)
|
||||
return {"filepath": target_filepath, "fileinfo": target}
|
||||
|
||||
# After preorder check, add current role to set of visited roles.
|
||||
visited_role_names.add((role_name, parent_role))
|
||||
|
||||
# And also decrement number of visited roles.
|
||||
number_of_delegations -= 1
|
||||
child_roles = []
|
||||
|
||||
if role_metadata.delegations is not None:
|
||||
child_roles = role_metadata.delegations.roles
|
||||
|
||||
if target is None:
|
||||
|
||||
child_roles_to_visit = []
|
||||
# NOTE: This may be a slow operation if there are many
|
||||
# delegated roles.
|
||||
for child_role in child_roles:
|
||||
child_role_name = _visit_child_role(
|
||||
child_role, target_filepath
|
||||
)
|
||||
for child_role in role_metadata.delegations.roles:
|
||||
if child_role.is_delegated_path(target_filepath):
|
||||
logger.debug("Adding child role %s", child_role.name)
|
||||
|
||||
if child_role.terminating and child_role_name is not None:
|
||||
logger.debug(
|
||||
"Adding child role %s.\n"
|
||||
"Not backtracking to other roles.",
|
||||
child_role_name,
|
||||
)
|
||||
role_names = []
|
||||
child_roles_to_visit.append(
|
||||
(child_role_name, role_name)
|
||||
(child_role.name, role_name)
|
||||
)
|
||||
break
|
||||
|
||||
if child_role_name is None:
|
||||
logger.debug("Skipping child role %s", child_role_name)
|
||||
|
||||
else:
|
||||
logger.debug("Adding child role %s", child_role_name)
|
||||
child_roles_to_visit.append(
|
||||
(child_role_name, role_name)
|
||||
)
|
||||
|
||||
if child_role.terminating:
|
||||
logger.debug("Not backtracking to other roles.")
|
||||
delegations_to_visit = []
|
||||
break
|
||||
# Push 'child_roles_to_visit' in reverse order of appearance
|
||||
# onto 'role_names'. Roles are popped from the end of
|
||||
# the 'role_names' list.
|
||||
# onto 'delegations_to_visit'. Roles are popped from the end of
|
||||
# the list.
|
||||
child_roles_to_visit.reverse()
|
||||
role_names.extend(child_roles_to_visit)
|
||||
delegations_to_visit.extend(child_roles_to_visit)
|
||||
|
||||
else:
|
||||
logger.debug("Found target in current role %s", role_name)
|
||||
|
||||
if (
|
||||
target is None
|
||||
and number_of_delegations == 0
|
||||
and len(role_names) > 0
|
||||
):
|
||||
if number_of_delegations == 0 and len(delegations_to_visit) > 0:
|
||||
logger.debug(
|
||||
"%d roles left to visit, but allowed to "
|
||||
"visit at most %d delegations.",
|
||||
len(role_names),
|
||||
len(delegations_to_visit),
|
||||
self.config.max_delegations,
|
||||
)
|
||||
|
||||
return {"filepath": target_filepath, "fileinfo": target}
|
||||
|
||||
|
||||
def _visit_child_role(child_role: Dict, target_filepath: str) -> str:
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that determines whether the given 'target_filepath'
|
||||
is an allowed path of 'child_role'.
|
||||
|
||||
Ensure that we explore only delegated roles trusted with the target. The
|
||||
metadata for 'child_role' should have been refreshed prior to this point,
|
||||
however, the paths/targets that 'child_role' signs for have not been
|
||||
verified (as intended). The paths/targets that 'child_role' is allowed
|
||||
to specify in its metadata depends on the delegating role, and thus is
|
||||
left to the caller to verify. We verify here that 'target_filepath'
|
||||
is an allowed path according to the delegated 'child_role'.
|
||||
|
||||
TODO: Should the TUF spec restrict the repository to one particular
|
||||
algorithm? Should we allow the repository to specify in the role
|
||||
dictionary the algorithm used for these generated hashed paths?
|
||||
|
||||
<Arguments>
|
||||
child_role:
|
||||
The delegation targets role object of 'child_role', containing its
|
||||
paths, path_hash_prefixes, keys, and so on.
|
||||
|
||||
target_filepath:
|
||||
The path to the target file on the repository. This will be relative to
|
||||
the 'targets' (or equivalent) directory on a given mirror.
|
||||
|
||||
<Exceptions>
|
||||
None.
|
||||
|
||||
<Side Effects>
|
||||
None.
|
||||
|
||||
<Returns>
|
||||
If 'child_role' has been delegated the target with the name
|
||||
'target_filepath', then we return the role name of 'child_role'.
|
||||
|
||||
Otherwise, we return None.
|
||||
"""
|
||||
|
||||
child_role_name = child_role.name
|
||||
child_role_paths = child_role.paths
|
||||
child_role_path_hash_prefixes = child_role.path_hash_prefixes
|
||||
|
||||
if child_role_path_hash_prefixes is not None:
|
||||
target_filepath_hash = _get_filepath_hash(target_filepath)
|
||||
for child_role_path_hash_prefix in child_role_path_hash_prefixes:
|
||||
if not target_filepath_hash.startswith(child_role_path_hash_prefix):
|
||||
continue
|
||||
|
||||
return child_role_name
|
||||
|
||||
elif child_role_paths is not None:
|
||||
# Is 'child_role_name' allowed to sign for 'target_filepath'?
|
||||
for child_role_path in child_role_paths:
|
||||
# A child role path may be an explicit path or glob pattern (Unix
|
||||
# shell-style wildcards). The child role 'child_role_name' is
|
||||
# returned if 'target_filepath' is equal to or matches
|
||||
# 'child_role_path'. Explicit filepaths are also considered
|
||||
# matches. A repo maintainer might delegate a glob pattern with a
|
||||
# leading path separator, while the client requests a matching
|
||||
# target without a leading path separator - make sure to strip any
|
||||
# leading path separators so that a match is made.
|
||||
# Example: "foo.tgz" should match with "/*.tgz".
|
||||
if fnmatch.fnmatch(
|
||||
target_filepath.lstrip(os.sep), child_role_path.lstrip(os.sep)
|
||||
):
|
||||
logger.debug(
|
||||
"Child role %s is allowed to sign for %s",
|
||||
repr(child_role_name),
|
||||
repr(target_filepath),
|
||||
)
|
||||
|
||||
return child_role_name
|
||||
|
||||
logger.debug(
|
||||
"The given target path %s "
|
||||
"does not match the trusted path or glob pattern: %s",
|
||||
repr(target_filepath),
|
||||
repr(child_role_path),
|
||||
)
|
||||
continue
|
||||
|
||||
else:
|
||||
# 'role_name' should have been validated when it was downloaded.
|
||||
# The 'paths' or 'path_hash_prefixes' fields should not be missing,
|
||||
# so we raise a format error here in case they are both missing.
|
||||
raise exceptions.FormatError(
|
||||
repr(child_role_name) + " "
|
||||
'has neither a "paths" nor "path_hash_prefixes". At least'
|
||||
" one of these attributes must be present."
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _get_filepath_hash(target_filepath, hash_function="sha256"):
|
||||
"""
|
||||
Calculate the hash of the filepath to determine which bin to find the
|
||||
target.
|
||||
"""
|
||||
# The client currently assumes the repository (i.e., repository
|
||||
# tool) uses 'hash_function' to generate hashes and UTF-8.
|
||||
digest_object = sslib_hash.digest(hash_function)
|
||||
encoded_target_filepath = target_filepath.encode("utf-8")
|
||||
digest_object.update(encoded_target_filepath)
|
||||
target_filepath_hash = digest_object.hexdigest()
|
||||
|
||||
return target_filepath_hash
|
||||
# If this point is reached then target is not found, return None
|
||||
return None
|
||||
|
||||
|
||||
def _ensure_trailing_slash(url: str):
|
||||
|
|
|
|||
Loading…
Reference in a new issue