diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 219b7c72..a75f0999 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,12 +1,12 @@ import logging -from typing import Optional, Union +from typing import Optional, Union, Callable import os import sys import unittest from datetime import datetime from tuf import exceptions -from tuf.api.metadata import Metadata, MetaFile +from tuf.api.metadata import Metadata, Signed, Timestamp, Snapshot, MetaFile from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet from securesystemslib.signer import SSlibSigner @@ -84,41 +84,15 @@ def _update_all_besides_targets( snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) - def _modify_timestamp_meta(self, version: Optional[int] = 1): - """Remove hashes and length from timestamp.meta["snapshot.json"]. - Create a timestamp.meta["snapshot.json"] containing only version. - - Args: - version: - Version used when instantiating MetaFile for timestamp.meta. - - """ - timestamp = Metadata.from_bytes(self.metadata["timestamp"]) - timestamp.signed.meta["snapshot.json"] = MetaFile(version) - timestamp.sign(self.keystore["timestamp"]) - return timestamp.to_bytes() - - def _modify_snapshot_meta( - self, version: Union[int, None] = 1, length: Optional[int]= None - ): - """Modify hashes and length from snapshot_meta.meta["snapshot.json"]. - If version and length is None, then snapshot meta will be an empty dictionary. - - Args: - version: - Version used when instantiating MetaFile for snapshot.meta. - length: - Length used when instantiating MetaFile for snapshot.meta. - - """ - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) - if version is None and length is None: - snapshot.signed.meta = {} - else: - for metafile_path in snapshot.signed.meta: - snapshot.signed.meta[metafile_path] = MetaFile(version, length) - snapshot.sign(self.keystore["snapshot"]) - return snapshot.to_bytes() + def modify_metadata( + self, rolename: str, modification_func: Callable[["Signed"], None] + ): + """Instantiate metadata from rolename type, call modification_func and + sign it again with self.keystore[rolename] signer.""" + metadata = Metadata.from_bytes(self.metadata[rolename]) + modification_func(metadata.signed) + metadata.sign(self.keystore[rolename]) + return metadata.to_bytes() def test_update(self): self.trusted_set.root_update_finished() @@ -255,7 +229,10 @@ def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self): - modified_timestamp = self._modify_timestamp_meta(version=3) + def version_modifier(timestamp: Timestamp): + timestamp.version = 3 + + modified_timestamp = self.modify_metadata("timestamp", version_modifier) self._root_updated_and_update_timestamp(modified_timestamp) # new_timestamp.snapshot.version < trusted_timestamp.snapshot.version with self.assertRaises(exceptions.ReplayedMetadataError): @@ -272,16 +249,27 @@ def test_update_timestamp_expired(self): def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): - modified_timestamp = self._modify_timestamp_meta() - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._root_updated_and_update_timestamp(timestamp) snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_version_different_timestamp_snapshot_version(self): - modified_timestamp = self._modify_timestamp_meta(version=2) - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_version_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + timestamp.meta["snapshot.json"].version = 2 + + timestamp = self.modify_metadata( + "timestamp", hashes_length_version_modifier + ) + self._root_updated_and_update_timestamp(timestamp) # new_snapshot.version != trusted timestamp.meta["snapshot"].version snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.version = 3 @@ -290,8 +278,12 @@ def test_update_snapshot_version_different_timestamp_snapshot_version(self): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): - modified_timestamp = self._modify_timestamp_meta() - self._update_all_besides_targets(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._update_all_besides_targets(timestamp) # Test removing a meta_file in new_snapshot compared to the old snapshot snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.meta = {} @@ -300,8 +292,12 @@ def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self): - modified_timestamp = self._modify_timestamp_meta() - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._root_updated_and_update_timestamp(timestamp) # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version snapshot = Metadata.from_bytes(self.metadata["snapshot"]) for metafile_path in snapshot.signed.meta.keys(): @@ -312,8 +308,12 @@ def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_diffe self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_expired_new_snapshot(self): - modified_timestamp = self._modify_timestamp_meta() - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._root_updated_and_update_timestamp(timestamp) # new_snapshot has expired snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.expires = datetime(1970, 1, 1) @@ -323,34 +323,48 @@ def test_update_snapshot_expired_new_snapshot(self): def test_update_targets_no_meta_in_snapshot(self): - modified_timestamp = self._modify_timestamp_meta() - modified_snapshot = self._modify_snapshot_meta(version=None) - self._update_all_besides_targets( - timestamp_bytes = modified_timestamp, - snapshot_bytes= modified_snapshot - ) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + def no_meta_modifier(snapshot: Snapshot): + snapshot.meta = {} + + snapshot = self.modify_metadata("snapshot", no_meta_modifier) + self._update_all_besides_targets(timestamp, snapshot) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_hash_different_than_snapshot_meta_hash(self): - modified_timestamp = self._modify_timestamp_meta() - modified_snapshot = self._modify_snapshot_meta(version=1, length=1) - self._update_all_besides_targets( - timestamp_bytes = modified_timestamp, - snapshot_bytes= modified_snapshot - ) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + def meta_length_modifier(snapshot: Snapshot): + for metafile_path in snapshot.meta: + snapshot.meta[metafile_path] = MetaFile(version=1, length=1) + + snapshot = self.modify_metadata("snapshot", meta_length_modifier) + self._update_all_besides_targets(timestamp, snapshot) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_version_different_snapshot_meta_version(self): - modified_timestamp = self._modify_timestamp_meta() - modified_snapshot = self._modify_snapshot_meta(version=2) - self._update_all_besides_targets( - timestamp_bytes = modified_timestamp, - snapshot_bytes= modified_snapshot - ) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + def meta_modifier(snapshot: Snapshot): + for metafile_path in snapshot.meta: + snapshot.meta[metafile_path] = MetaFile(version=2) + + snapshot = self.modify_metadata("snapshot", meta_modifier) + self._update_all_besides_targets(timestamp, snapshot) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata["targets"])