diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 45eea203..564a26a5 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -8,12 +8,13 @@ import os import sys import tempfile +from typing import List, Optional import unittest from datetime import datetime, timedelta from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import Metadata +from tuf.api.metadata import Metadata, TOP_LEVEL_ROLE_NAMES from tuf.exceptions import ( BadVersionNumberError, ExpiredMetadataError, @@ -69,10 +70,21 @@ def _init_updater(self) -> Updater: self.sim, ) + def _assert_files_exist(self, roles: List[str]) -> None: + """Assert that local metadata files exist for 'roles'""" + expected_files = sorted([f"{role}.json" for role in roles]) + local_metadata_files = sorted(os.listdir(self.metadata_dir)) + self.assertListEqual(local_metadata_files, expected_files) + + def _assert_content_equals(self, role: str, version: Optional[int]=None) -> None: + """Assert that local file content is the expected""" + expected_content = self.sim._fetch_metadata(role, version) + with open(os.path.join(self.metadata_dir, f"{role}.json"), "rb") as f: + self.assertEqual(f.read(), expected_content) + def test_first_time_refresh(self) -> None: # Metadata dir contains only the mandatory initial root.json - metadata_files = os.listdir(self.metadata_dir) - self.assertListEqual(metadata_files, ["root.json"]) + self._assert_files_exist(["root"]) # Add one more root version to repository so that # refresh() updates from local trusted root (v1) to @@ -82,19 +94,19 @@ def test_first_time_refresh(self) -> None: self._run_refresh() - # Top-level metadata can be found in metadata dir - metadata_files_after_refresh = os.listdir(self.metadata_dir) - metadata_files_after_refresh.sort() - self.assertListEqual( - metadata_files_after_refresh, - ["root.json", "snapshot.json", "targets.json", "timestamp.json"], - ) + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + for role in TOP_LEVEL_ROLE_NAMES: + version = 2 if role == "root" else None + self._assert_content_equals(role, version) def test_trusted_root_missing(self) -> None: os.remove(os.path.join(self.metadata_dir, "root.json")) with self.assertRaises(OSError): self._run_refresh() + # Metadata dir is empty + self.assertFalse(os.listdir(self.metadata_dir)) + def test_trusted_root_expired(self) -> None: # Create an expired root version self.sim.root.expires = self.past_datetime @@ -107,8 +119,8 @@ def test_trusted_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): updater.refresh() - with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f: - self.assertEqual(f.read(), self.sim.signed_roots[-1]) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 2) # Local root metadata can be loaded even if expired updater = self._init_updater() @@ -117,11 +129,11 @@ def test_trusted_root_expired(self) -> None: self.sim.root.expires = self.sim.safe_expiry self.sim.root.version += 1 self.sim.publish_root() - updater.refresh() - with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f: - self.assertEqual(f.read(), self.sim.signed_roots[-1]) + # Root is successfully updated to latest version + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + self._assert_content_equals("root", 3) def test_trusted_root_unsigned(self) -> None: # Local trusted root is not signed @@ -133,6 +145,11 @@ def test_trusted_root_unsigned(self) -> None: with self.assertRaises(UnsignedMetadataError): self._run_refresh() + # The update failed, no changes in metadata + self._assert_files_exist(["root"]) + md_root_after = Metadata.from_file(root_path) + self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes()) + def test_max_root_rotations(self) -> None: # Root must stop looking for new versions after Y number of # intermediate files were downloaded. @@ -175,6 +192,11 @@ def test_intermediate_root_incorrectly_signed(self) -> None: with self.assertRaises(UnsignedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + + def test_intermediate_root_expired(self) -> None: # The expiration of the new (intermediate) root metadata file # does not matter yet @@ -190,20 +212,24 @@ def test_intermediate_root_expired(self) -> None: self.sim.publish_root() self._run_refresh() - md_root = Metadata.from_file( - os.path.join(self.metadata_dir, "root.json") - ) - self.assertEqual(md_root.signed.version, self.sim.root.version) + + # Successfully updated to root v3 + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + self._assert_content_equals("root", 3) def test_final_root_incorrectly_signed(self) -> None: # Check for an arbitrary software attack - self.sim.root.version += 1 + self.sim.root.version += 1 # root v2 self.sim.signers["root"].clear() self.sim.publish_root() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + def test_new_root_same_version(self) -> None: # Check for a rollback_attack # Repository serves a root file with the same version as previous @@ -211,6 +237,10 @@ def test_new_root_same_version(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + def test_new_root_nonconsecutive_version(self) -> None: # Repository serves non-consecutive root version self.sim.root.version += 2 @@ -218,6 +248,10 @@ def test_new_root_nonconsecutive_version(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + def test_final_root_expired(self) -> None: # Check for a freeze attack # Final root is expired @@ -228,12 +262,18 @@ def test_final_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + # The update failed but final root is persisted on the file system + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 2) + def test_new_timestamp_unsigned(self) -> None: # Check for an arbitrary software attack self.sim.signers["timestamp"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + self._assert_files_exist(["root"]) + def test_new_timestamp_version_rollback(self) -> None: # Check for a rollback attack self.sim.timestamp.version = 2 @@ -243,19 +283,25 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + self.assertEqual(md_timestamp.signed.version, 2) + def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. self.sim.snapshot.version = 2 - self.sim.update_timestamp() + self.sim.update_timestamp() # timestamp v2 self._run_refresh() # Snapshot meta version is smaller than previous self.sim.timestamp.snapshot_meta.version = 1 - self.sim.timestamp.version += 1 + self.sim.timestamp.version += 1 # timestamp v3 with self.assertRaises(ReplayedMetadataError): self._run_refresh() + md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + self.assertEqual(md_timestamp.signed.version, 2) + def test_new_timestamp_expired(self) -> None: # Check for a freeze attack self.sim.timestamp.expires = self.past_datetime @@ -264,6 +310,8 @@ def test_new_timestamp_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + self._assert_files_exist(["root"]) + def test_new_snapshot_hash_mismatch(self) -> None: # Check against timestamp role’s snapshot hash @@ -283,12 +331,20 @@ def test_new_snapshot_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() + md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + self.assertEqual(md_timestamp.signed.version, 3) + + md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + self.assertEqual(md_snapshot.signed.version, 1) + def test_new_snapshot_unsigned(self) -> None: # Check for an arbitrary software attack self.sim.signers["snapshot"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp"]) + # TODO: RepositorySimulator works always with consistent snapshot # enabled which forces the client to look for the snapshot version # written in timestamp (which leads to "Unknown snapshot version"). @@ -315,6 +371,9 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + self.assertEqual(md_snapshot.signed.version, 2) + def test_new_snapshot_expired(self) -> None: # Check for a freeze attack self.sim.snapshot.expires = self.past_datetime @@ -323,6 +382,9 @@ def test_new_snapshot_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp"]) + + def test_new_targets_hash_mismatch(self) -> None: # Check against snapshot role’s targets hashes @@ -341,12 +403,20 @@ def test_new_targets_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() + md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + self.assertEqual(md_snapshot.signed.version, 3) + + md_targets = Metadata.from_file(os.path.join(self.metadata_dir, "targets.json")) + self.assertEqual(md_targets.signed.version, 1) + def test_new_targets_unsigned(self) -> None: # Check for an arbitrary software attack self.sim.signers["targets"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp", "snapshot"]) + # TODO: RepositorySimulator works always with consistent snapshot # enabled which forces the client to look for the targets version # written in snapshot (which leads to "Unknown targets version"). @@ -366,6 +436,8 @@ def test_new_targets_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp", "snapshot"]) + if __name__ == "__main__":