mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
examples: Use verification results in repo example
This is an example of using the verification resutls in a repository. The only remaining tricky part is in _get_verification_result(): * has to figure out the delegating metadata (something we currently cannot provide in repository.Repository for the general case) * Needs a special case for first root Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
This commit is contained in:
parent
26bdbbe20c
commit
b8dbe307db
1 changed files with 43 additions and 8 deletions
|
|
@ -8,7 +8,7 @@
|
|||
import logging
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List
|
||||
from typing import Dict, List, Union
|
||||
|
||||
from securesystemslib import keys
|
||||
from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner
|
||||
|
|
@ -20,10 +20,13 @@
|
|||
Metadata,
|
||||
MetaFile,
|
||||
Root,
|
||||
RootVerificationResult,
|
||||
Signed,
|
||||
Snapshot,
|
||||
TargetFile,
|
||||
Targets,
|
||||
Timestamp,
|
||||
VerificationResult,
|
||||
)
|
||||
from tuf.repository import Repository
|
||||
|
||||
|
|
@ -89,6 +92,27 @@ def targets_infos(self) -> Dict[str, MetaFile]:
|
|||
def snapshot_info(self) -> MetaFile:
|
||||
return self._snapshot_info
|
||||
|
||||
def _get_verification_result(
|
||||
self, role: str, md: Metadata
|
||||
) -> Union[VerificationResult, RootVerificationResult]:
|
||||
"""Verify roles metadata using the existing repository metadata"""
|
||||
if role == Root.type:
|
||||
assert isinstance(md.signed, Root)
|
||||
root = self.root()
|
||||
if root.version == 0:
|
||||
# special case first root
|
||||
root = md.signed
|
||||
return md.signed.get_root_verification_result(
|
||||
root, md.signed_bytes, md.signatures
|
||||
)
|
||||
if role in [Timestamp.type, Snapshot.type, Targets.type]:
|
||||
delegator: Signed = self.root()
|
||||
else:
|
||||
delegator = self.targets()
|
||||
return delegator.get_verification_result(
|
||||
role, md.signed_bytes, md.signatures
|
||||
)
|
||||
|
||||
def open(self, role: str) -> Metadata:
|
||||
"""Return current Metadata for role from 'storage' (or create a new one)"""
|
||||
|
||||
|
|
@ -112,6 +136,14 @@ def close(self, role: str, md: Metadata) -> None:
|
|||
for signer in self.signer_cache[role]:
|
||||
md.sign(signer, append=True)
|
||||
|
||||
# Double check that we only write verified metadata
|
||||
vr = self._get_verification_result(role, md)
|
||||
if not vr:
|
||||
raise ValueError(f"Role {role} failed to verify")
|
||||
keyids = [keyid[:7] for keyid in vr.signed]
|
||||
verify_str = f"verified with keys [{', '.join(keyids)}]"
|
||||
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)
|
||||
|
||||
# store new metadata version, update version caches
|
||||
self.role_cache[role].append(md)
|
||||
if role == "snapshot":
|
||||
|
|
@ -130,8 +162,6 @@ def add_target(self, path: str, content: str) -> None:
|
|||
with self.edit_targets() as targets:
|
||||
targets.targets[path] = TargetFile.from_data(path, data)
|
||||
|
||||
logger.debug("Targets v%d", targets.version)
|
||||
|
||||
# update snapshot, timestamp
|
||||
self.do_snapshot()
|
||||
self.do_timestamp()
|
||||
|
|
@ -157,8 +187,6 @@ def submit_delegation(self, rolename: str, data: bytes) -> bool:
|
|||
logger.info("Failed to add delegation for %s: %s", rolename, e)
|
||||
return False
|
||||
|
||||
logger.debug("Targets v%d", targets.version)
|
||||
|
||||
# update snapshot, timestamp
|
||||
self.do_snapshot()
|
||||
self.do_timestamp()
|
||||
|
|
@ -177,8 +205,6 @@ def submit_role(self, role: str, data: bytes) -> bool:
|
|||
if not targetpath.startswith(f"{role}/"):
|
||||
raise ValueError(f"targets allowed under {role}/ only")
|
||||
|
||||
self.targets().verify_delegate(role, md.signed_bytes, md.signatures)
|
||||
|
||||
if md.signed.version != self.targets(role).version + 1:
|
||||
raise ValueError("Invalid version {md.signed.version}")
|
||||
|
||||
|
|
@ -186,10 +212,19 @@ def submit_role(self, role: str, data: bytes) -> bool:
|
|||
logger.info("Failed to add new version for %s: %s", role, e)
|
||||
return False
|
||||
|
||||
# Check that we only write verified metadata
|
||||
vr = self._get_verification_result(role, md)
|
||||
if not vr:
|
||||
logger.info("Role %s failed to verify", role)
|
||||
return False
|
||||
|
||||
keyids = [keyid[:7] for keyid in vr.signed]
|
||||
verify_str = f"verified with keys [{', '.join(keyids)}]"
|
||||
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)
|
||||
|
||||
# Checks passed: Add new delegated role version
|
||||
self.role_cache[role].append(md)
|
||||
self._targets_infos[f"{role}.json"].version = md.signed.version
|
||||
logger.debug("%s v%d", role, md.signed.version)
|
||||
|
||||
# To keep it simple, target content is generated from targetpath
|
||||
for targetpath in md.signed.targets:
|
||||
|
|
|
|||
Loading…
Reference in a new issue