mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Metadata API: Implement threshold verification
The delegating Metadata (root or targets) verifies that the delegated metadata is signed by required threshold of keys for the delegated role. Calling the function on non-delegator-metadata or giving a rolename that is not actually delegated by the delegator is considered a programming error and ValueError is raised. If the threshold is not reached, UnsignedMetadataError is raised. Tweak type annotation of Delegations.keys to match the one for Root.keys (so they can be assigned to same local variable). Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
This commit is contained in:
parent
745a8f7680
commit
37a4d41aad
2 changed files with 105 additions and 1 deletions
|
|
@ -333,6 +333,55 @@ def test_metadata_timestamp(self):
|
|||
)
|
||||
|
||||
|
||||
def test_metadata_verify_delegate(self):
|
||||
root_path = os.path.join(self.repo_dir, 'metadata', 'root.json')
|
||||
root = Metadata.from_file(root_path)
|
||||
snapshot_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'snapshot.json')
|
||||
snapshot = Metadata.from_file(snapshot_path)
|
||||
targets_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'targets.json')
|
||||
targets = Metadata.from_file(targets_path)
|
||||
role1_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'role1.json')
|
||||
role1 = Metadata.from_file(role1_path)
|
||||
role2_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'role2.json')
|
||||
role2 = Metadata.from_file(role2_path)
|
||||
|
||||
# test the expected delegation tree
|
||||
root.verify_delegate('root', root)
|
||||
root.verify_delegate('snapshot', snapshot)
|
||||
root.verify_delegate('targets', targets)
|
||||
targets.verify_delegate('role1', role1)
|
||||
role1.verify_delegate('role2', role2)
|
||||
|
||||
# only root and targets can verify delegates
|
||||
with self.assertRaises(ValueError):
|
||||
snapshot.verify_delegate('snapshot', snapshot)
|
||||
# verify fails for roles that are not delegated by delegator
|
||||
with self.assertRaises(ValueError):
|
||||
root.verify_delegate('role1', role1)
|
||||
with self.assertRaises(ValueError):
|
||||
targets.verify_delegate('targets', targets)
|
||||
|
||||
# verify fails when delegate content is modified
|
||||
expires = snapshot.signed.expires
|
||||
snapshot.signed.bump_expiration()
|
||||
with self.assertRaises(exceptions.UnsignedMetadataError):
|
||||
root.verify_delegate('snapshot', snapshot)
|
||||
snapshot.signed.expires = expires
|
||||
|
||||
# verify fails if roles keys do not sign the metadata
|
||||
with self.assertRaises(exceptions.UnsignedMetadataError):
|
||||
root.verify_delegate('timestamp', snapshot)
|
||||
|
||||
# verify fails if threshold of signatures is not reached
|
||||
root.signed.roles['snapshot'].threshold = 2
|
||||
with self.assertRaises(exceptions.UnsignedMetadataError):
|
||||
root.verify_delegate('snapshot', snapshot)
|
||||
|
||||
# TODO test successful verify with higher thresholds
|
||||
|
||||
def test_key_class(self):
|
||||
keys = {
|
||||
|
|
|
|||
|
|
@ -266,6 +266,61 @@ def sign(
|
|||
|
||||
return signature
|
||||
|
||||
def verify_delegate(
|
||||
self,
|
||||
role_name: str,
|
||||
delegate: "Metadata",
|
||||
signed_serializer: Optional[SignedSerializer] = None,
|
||||
):
|
||||
"""Verifies that 'delegate' is signed with the required threshold of
|
||||
keys for the delegated role 'role_name'.
|
||||
|
||||
Args:
|
||||
role_name: Name of the delegated role to verify
|
||||
delegate: The Metadata object for the delegated role
|
||||
signed_serializer: Optional; serializer used for delegate
|
||||
serialization. Default is CanonicalJSONSerializer.
|
||||
|
||||
Raises:
|
||||
UnsignedMetadataError: 'delegate' was not signed with required
|
||||
threshold of keys for 'role_name'
|
||||
"""
|
||||
|
||||
# Find the keys and role in our metadata
|
||||
role = None
|
||||
if isinstance(self.signed, Root):
|
||||
keys = self.signed.keys
|
||||
role = self.signed.roles.get(role_name)
|
||||
elif isinstance(self.signed, Targets):
|
||||
if self.signed.delegations:
|
||||
keys = self.signed.delegations.keys
|
||||
# Assume role names are unique in delegations.roles: #1426
|
||||
roles = self.signed.delegations.roles
|
||||
role = next((r for r in roles if r.name == role_name), None)
|
||||
else:
|
||||
raise ValueError("Call is valid only on delegator metadata")
|
||||
|
||||
if role is None:
|
||||
raise ValueError(f"No delegation found for {role_name}")
|
||||
|
||||
# verify that delegate is signed by required threshold of unique keys
|
||||
signing_keys = set()
|
||||
for keyid in role.keyids:
|
||||
key = keys[keyid]
|
||||
try:
|
||||
key.verify_signature(delegate, signed_serializer)
|
||||
# keyids are unique. Try to make sure the public keys are too
|
||||
signing_keys.add(key.keyval["public"])
|
||||
except exceptions.UnsignedMetadataError:
|
||||
pass
|
||||
|
||||
if len(signing_keys) < role.threshold:
|
||||
raise exceptions.UnsignedMetadataError(
|
||||
f"{role_name} was signed by {len(signing_keys)}/"
|
||||
f"{role.threshold} keys",
|
||||
delegate.signed,
|
||||
)
|
||||
|
||||
|
||||
class Signed(metaclass=abc.ABCMeta):
|
||||
"""A base class for the signed part of TUF metadata.
|
||||
|
|
@ -966,7 +1021,7 @@ class Delegations:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
keys: Mapping[str, Key],
|
||||
keys: Dict[str, Key],
|
||||
roles: List[DelegatedRole],
|
||||
unrecognized_fields: Optional[Mapping[str, Any]] = None,
|
||||
) -> None:
|
||||
|
|
|
|||
Loading…
Reference in a new issue