From b8dbe307dbcbf1b3c927e536664d0c2fbe49d1ff Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Sat, 3 Feb 2024 11:13:49 +0200 Subject: [PATCH] examples: Use verification results in repo example This is an example of using the verification resutls in a repository. The only remaining tricky part is in _get_verification_result(): * has to figure out the delegating metadata (something we currently cannot provide in repository.Repository for the general case) * Needs a special case for first root Signed-off-by: Jussi Kukkonen --- examples/repository/_simplerepo.py | 51 +++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/examples/repository/_simplerepo.py b/examples/repository/_simplerepo.py index ece4d99f..66891462 100644 --- a/examples/repository/_simplerepo.py +++ b/examples/repository/_simplerepo.py @@ -8,7 +8,7 @@ import logging from collections import defaultdict from datetime import datetime, timedelta -from typing import Dict, List +from typing import Dict, List, Union from securesystemslib import keys from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner @@ -20,10 +20,13 @@ Metadata, MetaFile, Root, + RootVerificationResult, + Signed, Snapshot, TargetFile, Targets, Timestamp, + VerificationResult, ) from tuf.repository import Repository @@ -89,6 +92,27 @@ def targets_infos(self) -> Dict[str, MetaFile]: def snapshot_info(self) -> MetaFile: return self._snapshot_info + def _get_verification_result( + self, role: str, md: Metadata + ) -> Union[VerificationResult, RootVerificationResult]: + """Verify roles metadata using the existing repository metadata""" + if role == Root.type: + assert isinstance(md.signed, Root) + root = self.root() + if root.version == 0: + # special case first root + root = md.signed + return md.signed.get_root_verification_result( + root, md.signed_bytes, md.signatures + ) + if role in [Timestamp.type, Snapshot.type, Targets.type]: + delegator: Signed = self.root() + else: + delegator = self.targets() + return delegator.get_verification_result( + role, md.signed_bytes, md.signatures + ) + def open(self, role: str) -> Metadata: """Return current Metadata for role from 'storage' (or create a new one)""" @@ -112,6 +136,14 @@ def close(self, role: str, md: Metadata) -> None: for signer in self.signer_cache[role]: md.sign(signer, append=True) + # Double check that we only write verified metadata + vr = self._get_verification_result(role, md) + if not vr: + raise ValueError(f"Role {role} failed to verify") + keyids = [keyid[:7] for keyid in vr.signed] + verify_str = f"verified with keys [{', '.join(keyids)}]" + logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str) + # store new metadata version, update version caches self.role_cache[role].append(md) if role == "snapshot": @@ -130,8 +162,6 @@ def add_target(self, path: str, content: str) -> None: with self.edit_targets() as targets: targets.targets[path] = TargetFile.from_data(path, data) - logger.debug("Targets v%d", targets.version) - # update snapshot, timestamp self.do_snapshot() self.do_timestamp() @@ -157,8 +187,6 @@ def submit_delegation(self, rolename: str, data: bytes) -> bool: logger.info("Failed to add delegation for %s: %s", rolename, e) return False - logger.debug("Targets v%d", targets.version) - # update snapshot, timestamp self.do_snapshot() self.do_timestamp() @@ -177,8 +205,6 @@ def submit_role(self, role: str, data: bytes) -> bool: if not targetpath.startswith(f"{role}/"): raise ValueError(f"targets allowed under {role}/ only") - self.targets().verify_delegate(role, md.signed_bytes, md.signatures) - if md.signed.version != self.targets(role).version + 1: raise ValueError("Invalid version {md.signed.version}") @@ -186,10 +212,19 @@ def submit_role(self, role: str, data: bytes) -> bool: logger.info("Failed to add new version for %s: %s", role, e) return False + # Check that we only write verified metadata + vr = self._get_verification_result(role, md) + if not vr: + logger.info("Role %s failed to verify", role) + return False + + keyids = [keyid[:7] for keyid in vr.signed] + verify_str = f"verified with keys [{', '.join(keyids)}]" + logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str) + # Checks passed: Add new delegated role version self.role_cache[role].append(md) self._targets_infos[f"{role}.json"].version = md.signed.version - logger.debug("%s v%d", role, md.signed.version) # To keep it simple, target content is generated from targetpath for targetpath in md.signed.targets: