Use Metadata.verify_delegate

Replace the helper verify_with_threshold() with
the newly implemented method Metadata.verify_delegate.

Add an aditional type annotation helping the IDE.

Signed-off-by: Teodora Sechkova <tsechkova@vmware.com>
This commit is contained in:
Teodora Sechkova 2021-07-23 16:55:06 +03:00
parent 8e9677d262
commit d70f69771d
No known key found for this signature in database
GPG key ID: 65F78F613EA1914E

View file

@ -71,47 +71,11 @@
from typing import Dict, Iterator, Optional
from tuf import exceptions
from tuf.api.metadata import Metadata, Root, Targets
from tuf.api.metadata import Metadata
from tuf.api.serialization import DeserializationError
logger = logging.getLogger(__name__)
# This is a placeholder until ...
# TODO issue 1306: implement this in Metadata API
def verify_with_threshold(
delegator: Metadata, role_name: str, unverified: Metadata
) -> bool:
"""Verify 'unverified' with keys and threshold defined in delegator"""
role = None
keys = {}
if isinstance(delegator.signed, Root):
keys = delegator.signed.keys
role = delegator.signed.roles.get(role_name)
elif isinstance(delegator.signed, Targets):
if delegator.signed.delegations:
keys = delegator.signed.delegations.keys
# role names are unique: first match is enough
roles = delegator.signed.delegations.roles
role = next((r for r in roles if r.name == role_name), None)
else:
raise ValueError("Call is valid only on delegator metadata")
if role is None:
raise ValueError(f"Delegated role {role_name} not found")
# verify that delegate is signed by correct threshold of unique keys
unique_keys = set()
for keyid in role.keyids:
key = keys[keyid]
try:
key.verify_signature(unverified)
unique_keys.add(key.keyval["public"])
except Exception as e: # pylint: disable=broad-except
# TODO specify the Exceptions (see issue #1351)
logger.info("verify failed: %s", e)
return len(unique_keys) >= role.threshold
class TrustedMetadataSet(abc.Mapping):
"""Internal class to keep track of trusted metadata in Updater
@ -207,20 +171,14 @@ def update_root(self, data: bytes):
if self.root is not None:
# We are not loading initial trusted root: verify the new one
if not verify_with_threshold(self.root, "root", new_root):
raise exceptions.UnsignedMetadataError(
"New root is not signed by root", new_root.signed
)
self.root.verify_delegate("root", new_root)
if new_root.signed.version != self.root.signed.version + 1:
raise exceptions.ReplayedMetadataError(
"root", new_root.signed.version, self.root.signed.version
)
if not verify_with_threshold(new_root, "root", new_root):
raise exceptions.UnsignedMetadataError(
"New root is not signed by itself", new_root.signed
)
new_root.verify_delegate("root", new_root)
self._trusted_set["root"] = new_root
logger.debug("Updated root")
@ -270,10 +228,7 @@ def update_timestamp(self, data: bytes):
f"Expected 'timestamp', got '{new_timestamp.signed.type}'"
)
if not verify_with_threshold(self.root, "timestamp", new_timestamp):
raise exceptions.UnsignedMetadataError(
"New timestamp is not signed by root", new_timestamp.signed
)
self.root.verify_delegate("timestamp", new_timestamp)
# If an existing trusted timestamp is updated,
# check for a rollback attack
@ -339,10 +294,7 @@ def update_snapshot(self, data: bytes):
f"Expected 'snapshot', got '{new_snapshot.signed.type}'"
)
if not verify_with_threshold(self.root, "snapshot", new_snapshot):
raise exceptions.UnsignedMetadataError(
"New snapshot is not signed by root", new_snapshot.signed
)
self.root.verify_delegate("snapshot", new_snapshot)
if (
new_snapshot.signed.version
@ -408,7 +360,7 @@ def update_delegated_targets(
if self.snapshot is None:
raise RuntimeError("Cannot load targets before snapshot")
delegator = self.get(delegator_name)
delegator: Optional[Metadata] = self.get(delegator_name)
if delegator is None:
raise RuntimeError("Cannot load targets before delegator")
@ -438,11 +390,7 @@ def update_delegated_targets(
f"Expected 'targets', got '{new_delegate.signed.type}'"
)
if not verify_with_threshold(delegator, role_name, new_delegate):
raise exceptions.UnsignedMetadataError(
f"New {role_name} is not signed by {delegator_name}",
new_delegate,
)
delegator.verify_delegate(role_name, new_delegate)
if new_delegate.signed.version != meta.version:
raise exceptions.BadVersionNumberError(