From 37a4d41aadcb532bec08907398e4e2c74c76b329 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 22 Apr 2021 19:37:38 +0300 Subject: [PATCH] Metadata API: Implement threshold verification The delegating Metadata (root or targets) verifies that the delegated metadata is signed by required threshold of keys for the delegated role. Calling the function on non-delegator-metadata or giving a rolename that is not actually delegated by the delegator is considered a programming error and ValueError is raised. If the threshold is not reached, UnsignedMetadataError is raised. Tweak type annotation of Delegations.keys to match the one for Root.keys (so they can be assigned to same local variable). Signed-off-by: Jussi Kukkonen --- tests/test_api.py | 49 ++++++++++++++++++++++++++++++++++++++ tuf/api/metadata.py | 57 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/tests/test_api.py b/tests/test_api.py index 9799fb0a..c8b4325d 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -333,6 +333,55 @@ def test_metadata_timestamp(self): ) + def test_metadata_verify_delegate(self): + root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') + root = Metadata.from_file(root_path) + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + snapshot = Metadata.from_file(snapshot_path) + targets_path = os.path.join( + self.repo_dir, 'metadata', 'targets.json') + targets = Metadata.from_file(targets_path) + role1_path = os.path.join( + self.repo_dir, 'metadata', 'role1.json') + role1 = Metadata.from_file(role1_path) + role2_path = os.path.join( + self.repo_dir, 'metadata', 'role2.json') + role2 = Metadata.from_file(role2_path) + + # test the expected delegation tree + root.verify_delegate('root', root) + root.verify_delegate('snapshot', snapshot) + root.verify_delegate('targets', targets) + targets.verify_delegate('role1', role1) + role1.verify_delegate('role2', role2) + + # only root and targets can verify delegates + with self.assertRaises(ValueError): + snapshot.verify_delegate('snapshot', snapshot) + # verify fails for roles that are not delegated by delegator + with self.assertRaises(ValueError): + root.verify_delegate('role1', role1) + with self.assertRaises(ValueError): + targets.verify_delegate('targets', targets) + + # verify fails when delegate content is modified + expires = snapshot.signed.expires + snapshot.signed.bump_expiration() + with self.assertRaises(exceptions.UnsignedMetadataError): + root.verify_delegate('snapshot', snapshot) + snapshot.signed.expires = expires + + # verify fails if roles keys do not sign the metadata + with self.assertRaises(exceptions.UnsignedMetadataError): + root.verify_delegate('timestamp', snapshot) + + # verify fails if threshold of signatures is not reached + root.signed.roles['snapshot'].threshold = 2 + with self.assertRaises(exceptions.UnsignedMetadataError): + root.verify_delegate('snapshot', snapshot) + + # TODO test successful verify with higher thresholds def test_key_class(self): keys = { diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 50a6c318..e9e78a0c 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -266,6 +266,61 @@ def sign( return signature + def verify_delegate( + self, + role_name: str, + delegate: "Metadata", + signed_serializer: Optional[SignedSerializer] = None, + ): + """Verifies that 'delegate' is signed with the required threshold of + keys for the delegated role 'role_name'. + + Args: + role_name: Name of the delegated role to verify + delegate: The Metadata object for the delegated role + signed_serializer: Optional; serializer used for delegate + serialization. Default is CanonicalJSONSerializer. + + Raises: + UnsignedMetadataError: 'delegate' was not signed with required + threshold of keys for 'role_name' + """ + + # Find the keys and role in our metadata + role = None + if isinstance(self.signed, Root): + keys = self.signed.keys + role = self.signed.roles.get(role_name) + elif isinstance(self.signed, Targets): + if self.signed.delegations: + keys = self.signed.delegations.keys + # Assume role names are unique in delegations.roles: #1426 + roles = self.signed.delegations.roles + role = next((r for r in roles if r.name == role_name), None) + else: + raise ValueError("Call is valid only on delegator metadata") + + if role is None: + raise ValueError(f"No delegation found for {role_name}") + + # verify that delegate is signed by required threshold of unique keys + signing_keys = set() + for keyid in role.keyids: + key = keys[keyid] + try: + key.verify_signature(delegate, signed_serializer) + # keyids are unique. Try to make sure the public keys are too + signing_keys.add(key.keyval["public"]) + except exceptions.UnsignedMetadataError: + pass + + if len(signing_keys) < role.threshold: + raise exceptions.UnsignedMetadataError( + f"{role_name} was signed by {len(signing_keys)}/" + f"{role.threshold} keys", + delegate.signed, + ) + class Signed(metaclass=abc.ABCMeta): """A base class for the signed part of TUF metadata. @@ -966,7 +1021,7 @@ class Delegations: def __init__( self, - keys: Mapping[str, Key], + keys: Dict[str, Key], roles: List[DelegatedRole], unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: