mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #1680 from ivanayov/ivanayov/ngclient_loaded_metadata
Update ngclient to return loaded metadata
This commit is contained in:
commit
acb201d6cc
3 changed files with 60 additions and 15 deletions
|
|
@ -129,6 +129,22 @@ def test_update(self):
|
|||
|
||||
self.assertTrue(count, 6)
|
||||
|
||||
def test_update_metadata_output(self):
|
||||
timestamp = self.trusted_set.update_timestamp(self.metadata["timestamp"])
|
||||
snapshot = self.trusted_set.update_snapshot(self.metadata["snapshot"])
|
||||
targets = self.trusted_set.update_targets(self.metadata["targets"])
|
||||
delegeted_targets_1 = self.trusted_set.update_delegated_targets(
|
||||
self.metadata["role1"], "role1", "targets"
|
||||
)
|
||||
delegeted_targets_2 = self.trusted_set.update_delegated_targets(
|
||||
self.metadata["role2"], "role2", "role1"
|
||||
)
|
||||
self.assertIsInstance(timestamp.signed, Timestamp)
|
||||
self.assertIsInstance(snapshot.signed, Snapshot)
|
||||
self.assertIsInstance(targets.signed, Targets)
|
||||
self.assertIsInstance(delegeted_targets_1.signed, Targets)
|
||||
self.assertIsInstance(delegeted_targets_2.signed, Targets)
|
||||
|
||||
def test_out_of_order_ops(self):
|
||||
# Update snapshot before timestamp
|
||||
with self.assertRaises(RuntimeError):
|
||||
|
|
|
|||
|
|
@ -141,7 +141,7 @@ def targets(self) -> Optional[Metadata[Targets]]:
|
|||
return self._trusted_set.get("targets")
|
||||
|
||||
# Methods for updating metadata
|
||||
def update_root(self, data: bytes) -> None:
|
||||
def update_root(self, data: bytes) -> Metadata[Root]:
|
||||
"""Verifies and loads 'data' as new root metadata.
|
||||
|
||||
Note that an expired intermediate root is considered valid: expiry is
|
||||
|
|
@ -153,6 +153,9 @@ def update_root(self, data: bytes) -> None:
|
|||
Raises:
|
||||
RepositoryError: Metadata failed to load or verify. The actual
|
||||
error type and content will contain more details.
|
||||
|
||||
Returns:
|
||||
Deserialized and verified root Metadata object
|
||||
"""
|
||||
if self.timestamp is not None:
|
||||
raise RuntimeError("Cannot update root after timestamp")
|
||||
|
|
@ -182,7 +185,9 @@ def update_root(self, data: bytes) -> None:
|
|||
self._trusted_set["root"] = new_root
|
||||
logger.info("Updated root v%d", new_root.signed.version)
|
||||
|
||||
def update_timestamp(self, data: bytes) -> None:
|
||||
return new_root
|
||||
|
||||
def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
|
||||
"""Verifies and loads 'data' as new timestamp metadata.
|
||||
|
||||
Note that an intermediate timestamp is allowed to be expired:
|
||||
|
|
@ -199,6 +204,9 @@ def update_timestamp(self, data: bytes) -> None:
|
|||
RepositoryError: Metadata failed to load or verify as final
|
||||
timestamp. The actual error type and content will contain
|
||||
more details.
|
||||
|
||||
Returns:
|
||||
Deserialized and verified timestamp Metadata object
|
||||
"""
|
||||
if self.snapshot is not None:
|
||||
raise RuntimeError("Cannot update timestamp after snapshot")
|
||||
|
|
@ -251,6 +259,8 @@ def update_timestamp(self, data: bytes) -> None:
|
|||
# timestamp is loaded: raise if it is not valid _final_ timestamp
|
||||
self._check_final_timestamp()
|
||||
|
||||
return new_timestamp
|
||||
|
||||
def _check_final_timestamp(self) -> None:
|
||||
"""Raise if timestamp is expired"""
|
||||
|
||||
|
|
@ -260,7 +270,7 @@ def _check_final_timestamp(self) -> None:
|
|||
|
||||
def update_snapshot(
|
||||
self, data: bytes, trusted: Optional[bool] = False
|
||||
) -> None:
|
||||
) -> Metadata[Snapshot]:
|
||||
"""Verifies and loads 'data' as new snapshot metadata.
|
||||
|
||||
Note that an intermediate snapshot is allowed to be expired and version
|
||||
|
|
@ -282,6 +292,9 @@ def update_snapshot(
|
|||
Raises:
|
||||
RepositoryError: data failed to load or verify as final snapshot.
|
||||
The actual error type and content will contain more details.
|
||||
|
||||
Returns:
|
||||
Deserialized and verified snapshot Metadata object
|
||||
"""
|
||||
|
||||
if self.timestamp is None:
|
||||
|
|
@ -347,6 +360,8 @@ def update_snapshot(
|
|||
# snapshot is loaded, but we raise if it's not valid _final_ snapshot
|
||||
self._check_final_snapshot()
|
||||
|
||||
return new_snapshot
|
||||
|
||||
def _check_final_snapshot(self) -> None:
|
||||
"""Raise if snapshot is expired or meta version does not match"""
|
||||
|
||||
|
|
@ -361,7 +376,7 @@ def _check_final_snapshot(self) -> None:
|
|||
f"got {self.snapshot.signed.version}"
|
||||
)
|
||||
|
||||
def update_targets(self, data: bytes) -> None:
|
||||
def update_targets(self, data: bytes) -> Metadata[Targets]:
|
||||
"""Verifies and loads 'data' as new top-level targets metadata.
|
||||
|
||||
Args:
|
||||
|
|
@ -370,12 +385,15 @@ def update_targets(self, data: bytes) -> None:
|
|||
Raises:
|
||||
RepositoryError: Metadata failed to load or verify. The actual
|
||||
error type and content will contain more details.
|
||||
|
||||
Returns:
|
||||
Deserialized and verified targets Metadata object
|
||||
"""
|
||||
self.update_delegated_targets(data, "targets", "root")
|
||||
return self.update_delegated_targets(data, "targets", "root")
|
||||
|
||||
def update_delegated_targets(
|
||||
self, data: bytes, role_name: str, delegator_name: str
|
||||
) -> None:
|
||||
) -> Metadata[Targets]:
|
||||
"""Verifies and loads 'data' as new metadata for target 'role_name'.
|
||||
|
||||
Args:
|
||||
|
|
@ -386,6 +404,9 @@ def update_delegated_targets(
|
|||
Raises:
|
||||
RepositoryError: Metadata failed to load or verify. The actual
|
||||
error type and content will contain more details.
|
||||
|
||||
Returns:
|
||||
Deserialized and verified targets Metadata object
|
||||
"""
|
||||
if self.snapshot is None:
|
||||
raise RuntimeError("Cannot load targets before snapshot")
|
||||
|
|
@ -438,6 +459,8 @@ def update_delegated_targets(
|
|||
self._trusted_set[role_name] = new_delegate
|
||||
logger.info("Updated %s v%d", role_name, version)
|
||||
|
||||
return new_delegate
|
||||
|
||||
def _load_trusted_root(self, data: bytes) -> None:
|
||||
"""Verifies and loads 'data' as trusted root metadata.
|
||||
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@
|
|||
from securesystemslib import util as sslib_util
|
||||
|
||||
from tuf import exceptions
|
||||
from tuf.api.metadata import TargetFile, Targets
|
||||
from tuf.api.metadata import Metadata, TargetFile, Targets
|
||||
from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set
|
||||
from tuf.ngclient.config import UpdaterConfig
|
||||
from tuf.ngclient.fetcher import FetcherInterface
|
||||
|
|
@ -368,12 +368,15 @@ def _load_snapshot(self) -> None:
|
|||
self._trusted_set.update_snapshot(data)
|
||||
self._persist_metadata("snapshot", data)
|
||||
|
||||
def _load_targets(self, role: str, parent_role: str) -> None:
|
||||
def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]:
|
||||
"""Load local (and if needed remote) metadata for 'role'."""
|
||||
try:
|
||||
data = self._load_local_metadata(role)
|
||||
self._trusted_set.update_delegated_targets(data, role, parent_role)
|
||||
delegated_targets = self._trusted_set.update_delegated_targets(
|
||||
data, role, parent_role
|
||||
)
|
||||
logger.debug("Local %s is valid: not downloading new one", role)
|
||||
return delegated_targets
|
||||
except (OSError, exceptions.RepositoryError) as e:
|
||||
# Local 'role' does not exist or is invalid: update from remote
|
||||
logger.debug("Failed to load local %s: %s", role, e)
|
||||
|
|
@ -386,9 +389,13 @@ def _load_targets(self, role: str, parent_role: str) -> None:
|
|||
version = metainfo.version
|
||||
|
||||
data = self._download_metadata(role, length, version)
|
||||
self._trusted_set.update_delegated_targets(data, role, parent_role)
|
||||
delegated_targets = self._trusted_set.update_delegated_targets(
|
||||
data, role, parent_role
|
||||
)
|
||||
self._persist_metadata(role, data)
|
||||
|
||||
return delegated_targets
|
||||
|
||||
def _preorder_depth_first_walk(
|
||||
self, target_filepath: str
|
||||
) -> Optional[TargetFile]:
|
||||
|
|
@ -417,10 +424,9 @@ def _preorder_depth_first_walk(
|
|||
|
||||
# The metadata for 'role_name' must be downloaded/updated before
|
||||
# its targets, delegations, and child roles can be inspected.
|
||||
self._load_targets(role_name, parent_role)
|
||||
targets = self._load_targets(role_name, parent_role).signed
|
||||
|
||||
role_metadata: Targets = self._trusted_set[role_name].signed
|
||||
target = role_metadata.targets.get(target_filepath)
|
||||
target = targets.targets.get(target_filepath)
|
||||
|
||||
if target is not None:
|
||||
logger.debug("Found target in current role %s", role_name)
|
||||
|
|
@ -432,11 +438,11 @@ def _preorder_depth_first_walk(
|
|||
# And also decrement number of visited roles.
|
||||
number_of_delegations -= 1
|
||||
|
||||
if role_metadata.delegations is not None:
|
||||
if targets.delegations is not None:
|
||||
child_roles_to_visit = []
|
||||
# NOTE: This may be a slow operation if there are many
|
||||
# delegated roles.
|
||||
for child_role in role_metadata.delegations.roles.values():
|
||||
for child_role in targets.delegations.roles.values():
|
||||
if child_role.is_delegated_path(target_filepath):
|
||||
logger.debug("Adding child role %s", child_role.name)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue