diff --git a/tests/test_api.py b/tests/test_api.py index 9799fb0a..c8b4325d 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -333,6 +333,55 @@ def test_metadata_timestamp(self): ) + def test_metadata_verify_delegate(self): + root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') + root = Metadata.from_file(root_path) + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + snapshot = Metadata.from_file(snapshot_path) + targets_path = os.path.join( + self.repo_dir, 'metadata', 'targets.json') + targets = Metadata.from_file(targets_path) + role1_path = os.path.join( + self.repo_dir, 'metadata', 'role1.json') + role1 = Metadata.from_file(role1_path) + role2_path = os.path.join( + self.repo_dir, 'metadata', 'role2.json') + role2 = Metadata.from_file(role2_path) + + # test the expected delegation tree + root.verify_delegate('root', root) + root.verify_delegate('snapshot', snapshot) + root.verify_delegate('targets', targets) + targets.verify_delegate('role1', role1) + role1.verify_delegate('role2', role2) + + # only root and targets can verify delegates + with self.assertRaises(ValueError): + snapshot.verify_delegate('snapshot', snapshot) + # verify fails for roles that are not delegated by delegator + with self.assertRaises(ValueError): + root.verify_delegate('role1', role1) + with self.assertRaises(ValueError): + targets.verify_delegate('targets', targets) + + # verify fails when delegate content is modified + expires = snapshot.signed.expires + snapshot.signed.bump_expiration() + with self.assertRaises(exceptions.UnsignedMetadataError): + root.verify_delegate('snapshot', snapshot) + snapshot.signed.expires = expires + + # verify fails if roles keys do not sign the metadata + with self.assertRaises(exceptions.UnsignedMetadataError): + root.verify_delegate('timestamp', snapshot) + + # verify fails if threshold of signatures is not reached + root.signed.roles['snapshot'].threshold = 2 + with self.assertRaises(exceptions.UnsignedMetadataError): + root.verify_delegate('snapshot', snapshot) + + # TODO test successful verify with higher thresholds def test_key_class(self): keys = { diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 50a6c318..e9e78a0c 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -266,6 +266,61 @@ def sign( return signature + def verify_delegate( + self, + role_name: str, + delegate: "Metadata", + signed_serializer: Optional[SignedSerializer] = None, + ): + """Verifies that 'delegate' is signed with the required threshold of + keys for the delegated role 'role_name'. + + Args: + role_name: Name of the delegated role to verify + delegate: The Metadata object for the delegated role + signed_serializer: Optional; serializer used for delegate + serialization. Default is CanonicalJSONSerializer. + + Raises: + UnsignedMetadataError: 'delegate' was not signed with required + threshold of keys for 'role_name' + """ + + # Find the keys and role in our metadata + role = None + if isinstance(self.signed, Root): + keys = self.signed.keys + role = self.signed.roles.get(role_name) + elif isinstance(self.signed, Targets): + if self.signed.delegations: + keys = self.signed.delegations.keys + # Assume role names are unique in delegations.roles: #1426 + roles = self.signed.delegations.roles + role = next((r for r in roles if r.name == role_name), None) + else: + raise ValueError("Call is valid only on delegator metadata") + + if role is None: + raise ValueError(f"No delegation found for {role_name}") + + # verify that delegate is signed by required threshold of unique keys + signing_keys = set() + for keyid in role.keyids: + key = keys[keyid] + try: + key.verify_signature(delegate, signed_serializer) + # keyids are unique. Try to make sure the public keys are too + signing_keys.add(key.keyval["public"]) + except exceptions.UnsignedMetadataError: + pass + + if len(signing_keys) < role.threshold: + raise exceptions.UnsignedMetadataError( + f"{role_name} was signed by {len(signing_keys)}/" + f"{role.threshold} keys", + delegate.signed, + ) + class Signed(metaclass=abc.ABCMeta): """A base class for the signed part of TUF metadata. @@ -966,7 +1021,7 @@ class Delegations: def __init__( self, - keys: Mapping[str, Key], + keys: Dict[str, Key], roles: List[DelegatedRole], unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: