mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #1509 from sechkova/use_verify_with_threshold
ngclient: use Metadata.verify_delegate
This commit is contained in:
commit
98cc149489
1 changed files with 7 additions and 59 deletions
|
|
@ -71,47 +71,11 @@
|
|||
from typing import Dict, Iterator, Optional
|
||||
|
||||
from tuf import exceptions
|
||||
from tuf.api.metadata import Metadata, Root, Targets
|
||||
from tuf.api.metadata import Metadata
|
||||
from tuf.api.serialization import DeserializationError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# This is a placeholder until ...
|
||||
# TODO issue 1306: implement this in Metadata API
|
||||
def verify_with_threshold(
|
||||
delegator: Metadata, role_name: str, unverified: Metadata
|
||||
) -> bool:
|
||||
"""Verify 'unverified' with keys and threshold defined in delegator"""
|
||||
role = None
|
||||
keys = {}
|
||||
if isinstance(delegator.signed, Root):
|
||||
keys = delegator.signed.keys
|
||||
role = delegator.signed.roles.get(role_name)
|
||||
elif isinstance(delegator.signed, Targets):
|
||||
if delegator.signed.delegations:
|
||||
keys = delegator.signed.delegations.keys
|
||||
# role names are unique: first match is enough
|
||||
roles = delegator.signed.delegations.roles
|
||||
role = next((r for r in roles if r.name == role_name), None)
|
||||
else:
|
||||
raise ValueError("Call is valid only on delegator metadata")
|
||||
|
||||
if role is None:
|
||||
raise ValueError(f"Delegated role {role_name} not found")
|
||||
|
||||
# verify that delegate is signed by correct threshold of unique keys
|
||||
unique_keys = set()
|
||||
for keyid in role.keyids:
|
||||
key = keys[keyid]
|
||||
try:
|
||||
key.verify_signature(unverified)
|
||||
unique_keys.add(key.keyval["public"])
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
# TODO specify the Exceptions (see issue #1351)
|
||||
logger.info("verify failed: %s", e)
|
||||
|
||||
return len(unique_keys) >= role.threshold
|
||||
|
||||
|
||||
class TrustedMetadataSet(abc.Mapping):
|
||||
"""Internal class to keep track of trusted metadata in Updater
|
||||
|
|
@ -207,20 +171,14 @@ def update_root(self, data: bytes):
|
|||
|
||||
if self.root is not None:
|
||||
# We are not loading initial trusted root: verify the new one
|
||||
if not verify_with_threshold(self.root, "root", new_root):
|
||||
raise exceptions.UnsignedMetadataError(
|
||||
"New root is not signed by root", new_root.signed
|
||||
)
|
||||
self.root.verify_delegate("root", new_root)
|
||||
|
||||
if new_root.signed.version != self.root.signed.version + 1:
|
||||
raise exceptions.ReplayedMetadataError(
|
||||
"root", new_root.signed.version, self.root.signed.version
|
||||
)
|
||||
|
||||
if not verify_with_threshold(new_root, "root", new_root):
|
||||
raise exceptions.UnsignedMetadataError(
|
||||
"New root is not signed by itself", new_root.signed
|
||||
)
|
||||
new_root.verify_delegate("root", new_root)
|
||||
|
||||
self._trusted_set["root"] = new_root
|
||||
logger.debug("Updated root")
|
||||
|
|
@ -270,10 +228,7 @@ def update_timestamp(self, data: bytes):
|
|||
f"Expected 'timestamp', got '{new_timestamp.signed.type}'"
|
||||
)
|
||||
|
||||
if not verify_with_threshold(self.root, "timestamp", new_timestamp):
|
||||
raise exceptions.UnsignedMetadataError(
|
||||
"New timestamp is not signed by root", new_timestamp.signed
|
||||
)
|
||||
self.root.verify_delegate("timestamp", new_timestamp)
|
||||
|
||||
# If an existing trusted timestamp is updated,
|
||||
# check for a rollback attack
|
||||
|
|
@ -339,10 +294,7 @@ def update_snapshot(self, data: bytes):
|
|||
f"Expected 'snapshot', got '{new_snapshot.signed.type}'"
|
||||
)
|
||||
|
||||
if not verify_with_threshold(self.root, "snapshot", new_snapshot):
|
||||
raise exceptions.UnsignedMetadataError(
|
||||
"New snapshot is not signed by root", new_snapshot.signed
|
||||
)
|
||||
self.root.verify_delegate("snapshot", new_snapshot)
|
||||
|
||||
if (
|
||||
new_snapshot.signed.version
|
||||
|
|
@ -408,7 +360,7 @@ def update_delegated_targets(
|
|||
if self.snapshot is None:
|
||||
raise RuntimeError("Cannot load targets before snapshot")
|
||||
|
||||
delegator = self.get(delegator_name)
|
||||
delegator: Optional[Metadata] = self.get(delegator_name)
|
||||
if delegator is None:
|
||||
raise RuntimeError("Cannot load targets before delegator")
|
||||
|
||||
|
|
@ -438,11 +390,7 @@ def update_delegated_targets(
|
|||
f"Expected 'targets', got '{new_delegate.signed.type}'"
|
||||
)
|
||||
|
||||
if not verify_with_threshold(delegator, role_name, new_delegate):
|
||||
raise exceptions.UnsignedMetadataError(
|
||||
f"New {role_name} is not signed by {delegator_name}",
|
||||
new_delegate,
|
||||
)
|
||||
delegator.verify_delegate(role_name, new_delegate)
|
||||
|
||||
if new_delegate.signed.version != meta.version:
|
||||
raise exceptions.BadVersionNumberError(
|
||||
|
|
|
|||
Loading…
Reference in a new issue