mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Create constants for top-level rolenames
This is a change in the metadata API to remove hardcoded rolenames and use constants instead. Fixes #1648 Signed-off-by: Ivana Atanasova <iyovcheva@iyovcheva-a02.vmware.com>
This commit is contained in:
parent
d991362ff0
commit
d7c653470a
3 changed files with 63 additions and 59 deletions
|
|
@ -61,6 +61,11 @@
|
|||
SignedSerializer,
|
||||
)
|
||||
|
||||
_ROOT: str = "root"
|
||||
_SNAPSHOT: str = "snapshot"
|
||||
_TARGETS: str = "targets"
|
||||
_TIMESTAMP: str = "timestamp"
|
||||
|
||||
# pylint: disable=too-many-lines
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -68,7 +73,7 @@
|
|||
# We aim to support SPECIFICATION_VERSION and require the input metadata
|
||||
# files to have the same major version (the first number) as ours.
|
||||
SPECIFICATION_VERSION = ["1", "0", "19"]
|
||||
TOP_LEVEL_ROLE_NAMES = {"root", "timestamp", "snapshot", "targets"}
|
||||
TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS}
|
||||
|
||||
# T is a Generic type constraint for Metadata.signed
|
||||
T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
|
||||
|
|
@ -130,13 +135,13 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
|
|||
# Dispatch to contained metadata class on metadata _type field.
|
||||
_type = metadata["signed"]["_type"]
|
||||
|
||||
if _type == "targets":
|
||||
if _type == _TARGETS:
|
||||
inner_cls: Type[Signed] = Targets
|
||||
elif _type == "snapshot":
|
||||
elif _type == _SNAPSHOT:
|
||||
inner_cls = Snapshot
|
||||
elif _type == "timestamp":
|
||||
elif _type == _TIMESTAMP:
|
||||
inner_cls = Timestamp
|
||||
elif _type == "root":
|
||||
elif _type == _ROOT:
|
||||
inner_cls = Root
|
||||
else:
|
||||
raise ValueError(f'unrecognized metadata type "{_type}"')
|
||||
|
|
@ -394,18 +399,13 @@ class Signed(metaclass=abc.ABCMeta):
|
|||
unrecognized_fields: Dictionary of all unrecognized fields.
|
||||
"""
|
||||
|
||||
# Signed implementations are expected to override this
|
||||
_signed_type: ClassVar[str] = "signed"
|
||||
# type is required for static reference without changing the API
|
||||
type: ClassVar[str] = "signed"
|
||||
|
||||
# _type and type are identical: 1st replicates file format, 2nd passes lint
|
||||
@property
|
||||
def _type(self) -> str:
|
||||
return self._signed_type
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
"""Metadata type as string."""
|
||||
return self._signed_type
|
||||
return self.type
|
||||
|
||||
# NOTE: Signed is a stupid name, because this might not be signed yet, but
|
||||
# we keep it to match spec terminology (I often refer to this as "payload",
|
||||
|
|
@ -458,8 +458,8 @@ def _common_fields_from_dict(
|
|||
|
||||
"""
|
||||
_type = signed_dict.pop("_type")
|
||||
if _type != cls._signed_type:
|
||||
raise ValueError(f"Expected type {cls._signed_type}, got {_type}")
|
||||
if _type != cls.type:
|
||||
raise ValueError(f"Expected type {cls.type}, got {_type}")
|
||||
|
||||
version = signed_dict.pop("version")
|
||||
spec_version = signed_dict.pop("spec_version")
|
||||
|
|
@ -712,7 +712,7 @@ class Root(Signed):
|
|||
unrecognized_fields: Dictionary of all unrecognized fields.
|
||||
"""
|
||||
|
||||
_signed_type = "root"
|
||||
type = _ROOT
|
||||
|
||||
# TODO: determine an appropriate value for max-args
|
||||
# pylint: disable=too-many-arguments
|
||||
|
|
@ -965,7 +965,7 @@ class Timestamp(Signed):
|
|||
snapshot_meta: Meta information for snapshot metadata.
|
||||
"""
|
||||
|
||||
_signed_type = "timestamp"
|
||||
type = _TIMESTAMP
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -1015,7 +1015,7 @@ class Snapshot(Signed):
|
|||
meta: A dictionary of target metadata filenames to MetaFile objects.
|
||||
"""
|
||||
|
||||
_signed_type = "snapshot"
|
||||
type = _SNAPSHOT
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -1416,7 +1416,7 @@ class Targets(Signed):
|
|||
unrecognized_fields: Dictionary of all unrecognized fields.
|
||||
"""
|
||||
|
||||
_signed_type = "targets"
|
||||
type = _TARGETS
|
||||
|
||||
# TODO: determine an appropriate value for max-args
|
||||
# pylint: disable=too-many-arguments
|
||||
|
|
@ -1437,7 +1437,7 @@ def __init__(
|
|||
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets":
|
||||
"""Creates Targets object from its dict representation."""
|
||||
common_args = cls._common_fields_from_dict(signed_dict)
|
||||
targets = signed_dict.pop("targets")
|
||||
targets = signed_dict.pop(_TARGETS)
|
||||
try:
|
||||
delegations_dict = signed_dict.pop("delegations")
|
||||
except KeyError:
|
||||
|
|
@ -1458,7 +1458,7 @@ def to_dict(self) -> Dict[str, Any]:
|
|||
targets = {}
|
||||
for target_path, target_file_obj in self.targets.items():
|
||||
targets[target_path] = target_file_obj.to_dict()
|
||||
targets_dict["targets"] = targets
|
||||
targets_dict[_TARGETS] = targets
|
||||
if self.delegations is not None:
|
||||
targets_dict["delegations"] = self.delegations.to_dict()
|
||||
return targets_dict
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@
|
|||
network IO, which are not handled here.
|
||||
|
||||
Loaded metadata can be accessed via index access with rolename as key
|
||||
(trusted_set["root"]) or, in the case of top-level metadata, using the helper
|
||||
(trusted_set[Root.type]) or, in the case of top-level metadata, using the helper
|
||||
properties (trusted_set.root).
|
||||
|
||||
The rules that TrustedMetadataSet follows for top-level metadata are
|
||||
|
|
@ -35,7 +35,7 @@
|
|||
>>> trusted_set = TrustedMetadataSet(f.read())
|
||||
>>>
|
||||
>>> # update root from remote until no more are available
|
||||
>>> with download("root", trusted_set.root.signed.version + 1) as f:
|
||||
>>> with download(Root.type, trusted_set.root.signed.version + 1) as f:
|
||||
>>> trusted_set.update_root(f.read())
|
||||
>>>
|
||||
>>> # load local timestamp, then update from remote
|
||||
|
|
@ -45,7 +45,7 @@
|
|||
>>> except (RepositoryError, OSError):
|
||||
>>> pass # failure to load a local file is ok
|
||||
>>>
|
||||
>>> with download("timestamp") as f:
|
||||
>>> with download(Timestamp.type) as f:
|
||||
>>> trusted_set.update_timestamp(f.read())
|
||||
>>>
|
||||
>>> # load local snapshot, then update from remote if needed
|
||||
|
|
@ -55,7 +55,7 @@
|
|||
>>> except (RepositoryError, OSError):
|
||||
>>> # local snapshot is not valid, load from remote
|
||||
>>> # (RepositoryErrors here stop the update)
|
||||
>>> with download("snapshot", version) as f:
|
||||
>>> with download(Snapshot.type, version) as f:
|
||||
>>> trusted_set.update_snapshot(f.read())
|
||||
|
||||
TODO:
|
||||
|
|
@ -123,22 +123,22 @@ def __iter__(self) -> Iterator[Metadata]:
|
|||
@property
|
||||
def root(self) -> Metadata[Root]:
|
||||
"""Current root Metadata"""
|
||||
return self._trusted_set["root"]
|
||||
return self._trusted_set[Root.type]
|
||||
|
||||
@property
|
||||
def timestamp(self) -> Optional[Metadata[Timestamp]]:
|
||||
"""Current timestamp Metadata or None"""
|
||||
return self._trusted_set.get("timestamp")
|
||||
return self._trusted_set.get(Timestamp.type)
|
||||
|
||||
@property
|
||||
def snapshot(self) -> Optional[Metadata[Snapshot]]:
|
||||
"""Current snapshot Metadata or None"""
|
||||
return self._trusted_set.get("snapshot")
|
||||
return self._trusted_set.get(Snapshot.type)
|
||||
|
||||
@property
|
||||
def targets(self) -> Optional[Metadata[Targets]]:
|
||||
"""Current targets Metadata or None"""
|
||||
return self._trusted_set.get("targets")
|
||||
return self._trusted_set.get(Targets.type)
|
||||
|
||||
# Methods for updating metadata
|
||||
def update_root(self, data: bytes) -> Metadata[Root]:
|
||||
|
|
@ -166,23 +166,25 @@ def update_root(self, data: bytes) -> Metadata[Root]:
|
|||
except DeserializationError as e:
|
||||
raise exceptions.RepositoryError("Failed to load root") from e
|
||||
|
||||
if new_root.signed.type != "root":
|
||||
if new_root.signed.type != Root.type:
|
||||
raise exceptions.RepositoryError(
|
||||
f"Expected 'root', got '{new_root.signed.type}'"
|
||||
)
|
||||
|
||||
# Verify that new root is signed by trusted root
|
||||
self.root.verify_delegate("root", new_root)
|
||||
self.root.verify_delegate(Root.type, new_root)
|
||||
|
||||
if new_root.signed.version != self.root.signed.version + 1:
|
||||
raise exceptions.ReplayedMetadataError(
|
||||
"root", new_root.signed.version, self.root.signed.version
|
||||
Root.type,
|
||||
new_root.signed.version,
|
||||
self.root.signed.version,
|
||||
)
|
||||
|
||||
# Verify that new root is signed by itself
|
||||
new_root.verify_delegate("root", new_root)
|
||||
new_root.verify_delegate(Root.type, new_root)
|
||||
|
||||
self._trusted_set["root"] = new_root
|
||||
self._trusted_set[Root.type] = new_root
|
||||
logger.info("Updated root v%d", new_root.signed.version)
|
||||
|
||||
return new_root
|
||||
|
|
@ -222,12 +224,12 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
|
|||
except DeserializationError as e:
|
||||
raise exceptions.RepositoryError("Failed to load timestamp") from e
|
||||
|
||||
if new_timestamp.signed.type != "timestamp":
|
||||
if new_timestamp.signed.type != Timestamp.type:
|
||||
raise exceptions.RepositoryError(
|
||||
f"Expected 'timestamp', got '{new_timestamp.signed.type}'"
|
||||
)
|
||||
|
||||
self.root.verify_delegate("timestamp", new_timestamp)
|
||||
self.root.verify_delegate(Timestamp.type, new_timestamp)
|
||||
|
||||
# If an existing trusted timestamp is updated,
|
||||
# check for a rollback attack
|
||||
|
|
@ -235,7 +237,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
|
|||
# Prevent rolling back timestamp version
|
||||
if new_timestamp.signed.version < self.timestamp.signed.version:
|
||||
raise exceptions.ReplayedMetadataError(
|
||||
"timestamp",
|
||||
Timestamp.type,
|
||||
new_timestamp.signed.version,
|
||||
self.timestamp.signed.version,
|
||||
)
|
||||
|
|
@ -245,7 +247,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
|
|||
< self.timestamp.signed.snapshot_meta.version
|
||||
):
|
||||
raise exceptions.ReplayedMetadataError(
|
||||
"snapshot",
|
||||
Snapshot.type,
|
||||
new_timestamp.signed.snapshot_meta.version,
|
||||
self.timestamp.signed.snapshot_meta.version,
|
||||
)
|
||||
|
|
@ -253,7 +255,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
|
|||
# expiry not checked to allow old timestamp to be used for rollback
|
||||
# protection of new timestamp: expiry is checked in update_snapshot()
|
||||
|
||||
self._trusted_set["timestamp"] = new_timestamp
|
||||
self._trusted_set[Timestamp.type] = new_timestamp
|
||||
logger.info("Updated timestamp v%d", new_timestamp.signed.version)
|
||||
|
||||
# timestamp is loaded: raise if it is not valid _final_ timestamp
|
||||
|
|
@ -323,12 +325,12 @@ def update_snapshot(
|
|||
except DeserializationError as e:
|
||||
raise exceptions.RepositoryError("Failed to load snapshot") from e
|
||||
|
||||
if new_snapshot.signed.type != "snapshot":
|
||||
if new_snapshot.signed.type != Snapshot.type:
|
||||
raise exceptions.RepositoryError(
|
||||
f"Expected 'snapshot', got '{new_snapshot.signed.type}'"
|
||||
)
|
||||
|
||||
self.root.verify_delegate("snapshot", new_snapshot)
|
||||
self.root.verify_delegate(Snapshot.type, new_snapshot)
|
||||
|
||||
# version not checked against meta version to allow old snapshot to be
|
||||
# used in rollback protection: it is checked when targets is updated
|
||||
|
|
@ -354,7 +356,7 @@ def update_snapshot(
|
|||
# expiry not checked to allow old snapshot to be used for rollback
|
||||
# protection of new snapshot: it is checked when targets is updated
|
||||
|
||||
self._trusted_set["snapshot"] = new_snapshot
|
||||
self._trusted_set[Snapshot.type] = new_snapshot
|
||||
logger.info("Updated snapshot v%d", new_snapshot.signed.version)
|
||||
|
||||
# snapshot is loaded, but we raise if it's not valid _final_ snapshot
|
||||
|
|
@ -389,7 +391,7 @@ def update_targets(self, data: bytes) -> Metadata[Targets]:
|
|||
Returns:
|
||||
Deserialized and verified targets Metadata object
|
||||
"""
|
||||
return self.update_delegated_targets(data, "targets", "root")
|
||||
return self.update_delegated_targets(data, Targets.type, Root.type)
|
||||
|
||||
def update_delegated_targets(
|
||||
self, data: bytes, role_name: str, delegator_name: str
|
||||
|
|
@ -440,7 +442,7 @@ def update_delegated_targets(
|
|||
except DeserializationError as e:
|
||||
raise exceptions.RepositoryError("Failed to load snapshot") from e
|
||||
|
||||
if new_delegate.signed.type != "targets":
|
||||
if new_delegate.signed.type != Targets.type:
|
||||
raise exceptions.RepositoryError(
|
||||
f"Expected 'targets', got '{new_delegate.signed.type}'"
|
||||
)
|
||||
|
|
@ -472,12 +474,12 @@ def _load_trusted_root(self, data: bytes) -> None:
|
|||
except DeserializationError as e:
|
||||
raise exceptions.RepositoryError("Failed to load root") from e
|
||||
|
||||
if new_root.signed.type != "root":
|
||||
if new_root.signed.type != Root.type:
|
||||
raise exceptions.RepositoryError(
|
||||
f"Expected 'root', got '{new_root.signed.type}'"
|
||||
)
|
||||
|
||||
new_root.verify_delegate("root", new_root)
|
||||
new_root.verify_delegate(Root.type, new_root)
|
||||
|
||||
self._trusted_set["root"] = new_root
|
||||
self._trusted_set[Root.type] = new_root
|
||||
logger.info("Loaded trusted root v%d", new_root.signed.version)
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@
|
|||
from securesystemslib import util as sslib_util
|
||||
|
||||
from tuf import exceptions
|
||||
from tuf.api.metadata import Metadata, TargetFile, Targets
|
||||
from tuf.api.metadata import Metadata, Root, Snapshot, TargetFile, Targets, Timestamp
|
||||
from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set
|
||||
from tuf.ngclient.config import UpdaterConfig
|
||||
from tuf.ngclient.fetcher import FetcherInterface
|
||||
|
|
@ -114,7 +114,7 @@ def __init__(
|
|||
self._target_base_url = _ensure_trailing_slash(target_base_url)
|
||||
|
||||
# Read trusted local root metadata
|
||||
data = self._load_local_metadata("root")
|
||||
data = self._load_local_metadata(Root.type)
|
||||
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data)
|
||||
self._fetcher = fetcher or requests_fetcher.RequestsFetcher()
|
||||
self.config = config or UpdaterConfig()
|
||||
|
|
@ -146,7 +146,7 @@ def refresh(self) -> None:
|
|||
self._load_root()
|
||||
self._load_timestamp()
|
||||
self._load_snapshot()
|
||||
self._load_targets("targets", "root")
|
||||
self._load_targets(Targets.type, Root.type)
|
||||
|
||||
def _generate_target_file_path(self, targetinfo: TargetFile) -> str:
|
||||
if self.target_dir is None:
|
||||
|
|
@ -320,10 +320,12 @@ def _load_root(self) -> None:
|
|||
for next_version in range(lower_bound, upper_bound):
|
||||
try:
|
||||
data = self._download_metadata(
|
||||
"root", self.config.root_max_length, next_version
|
||||
Root.type,
|
||||
self.config.root_max_length,
|
||||
next_version,
|
||||
)
|
||||
self._trusted_set.update_root(data)
|
||||
self._persist_metadata("root", data)
|
||||
self._persist_metadata(Root.type, data)
|
||||
|
||||
except exceptions.FetcherHTTPError as exception:
|
||||
if exception.status_code not in {403, 404}:
|
||||
|
|
@ -334,7 +336,7 @@ def _load_root(self) -> None:
|
|||
def _load_timestamp(self) -> None:
|
||||
"""Load local and remote timestamp metadata"""
|
||||
try:
|
||||
data = self._load_local_metadata("timestamp")
|
||||
data = self._load_local_metadata(Timestamp.type)
|
||||
self._trusted_set.update_timestamp(data)
|
||||
except (OSError, exceptions.RepositoryError) as e:
|
||||
# Local timestamp does not exist or is invalid
|
||||
|
|
@ -342,15 +344,15 @@ def _load_timestamp(self) -> None:
|
|||
|
||||
# Load from remote (whether local load succeeded or not)
|
||||
data = self._download_metadata(
|
||||
"timestamp", self.config.timestamp_max_length
|
||||
Timestamp.type, self.config.timestamp_max_length
|
||||
)
|
||||
self._trusted_set.update_timestamp(data)
|
||||
self._persist_metadata("timestamp", data)
|
||||
self._persist_metadata(Timestamp.type, data)
|
||||
|
||||
def _load_snapshot(self) -> None:
|
||||
"""Load local (and if needed remote) snapshot metadata"""
|
||||
try:
|
||||
data = self._load_local_metadata("snapshot")
|
||||
data = self._load_local_metadata(Snapshot.type)
|
||||
self._trusted_set.update_snapshot(data, trusted=True)
|
||||
logger.debug("Local snapshot is valid: not downloading new one")
|
||||
except (OSError, exceptions.RepositoryError) as e:
|
||||
|
|
@ -364,9 +366,9 @@ def _load_snapshot(self) -> None:
|
|||
if self._trusted_set.root.signed.consistent_snapshot:
|
||||
version = snapshot_meta.version
|
||||
|
||||
data = self._download_metadata("snapshot", length, version)
|
||||
data = self._download_metadata(Snapshot.type, length, version)
|
||||
self._trusted_set.update_snapshot(data)
|
||||
self._persist_metadata("snapshot", data)
|
||||
self._persist_metadata(Snapshot.type, data)
|
||||
|
||||
def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]:
|
||||
"""Load local (and if needed remote) metadata for 'role'."""
|
||||
|
|
@ -412,7 +414,7 @@ def _preorder_depth_first_walk(
|
|||
|
||||
# List of delegations to be interrogated. A (role, parent role) pair
|
||||
# is needed to load and verify the delegated targets metadata.
|
||||
delegations_to_visit = [("targets", "root")]
|
||||
delegations_to_visit = [(Targets.type, Root.type)]
|
||||
visited_role_names: Set[str] = set()
|
||||
number_of_delegations = self.config.max_delegations
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue