mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
ngtests: Add addtional asserts for files on disk
Extend the TestRefresh cases with additional checks for expected metadata files and their content written on the file system. Signed-off-by: Teodora Sechkova <tsechkova@vmware.com>
This commit is contained in:
parent
954331c8af
commit
8a2c7857ac
1 changed files with 94 additions and 22 deletions
|
|
@ -8,12 +8,13 @@
|
|||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import List, Optional
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from tests import utils
|
||||
from tests.repository_simulator import RepositorySimulator
|
||||
from tuf.api.metadata import Metadata
|
||||
from tuf.api.metadata import Metadata, TOP_LEVEL_ROLE_NAMES
|
||||
from tuf.exceptions import (
|
||||
BadVersionNumberError,
|
||||
ExpiredMetadataError,
|
||||
|
|
@ -69,10 +70,21 @@ def _init_updater(self) -> Updater:
|
|||
self.sim,
|
||||
)
|
||||
|
||||
def _assert_files_exist(self, roles: List[str]) -> None:
|
||||
"""Assert that local metadata files exist for 'roles'"""
|
||||
expected_files = sorted([f"{role}.json" for role in roles])
|
||||
local_metadata_files = sorted(os.listdir(self.metadata_dir))
|
||||
self.assertListEqual(local_metadata_files, expected_files)
|
||||
|
||||
def _assert_content_equals(self, role: str, version: Optional[int]=None) -> None:
|
||||
"""Assert that local file content is the expected"""
|
||||
expected_content = self.sim._fetch_metadata(role, version)
|
||||
with open(os.path.join(self.metadata_dir, f"{role}.json"), "rb") as f:
|
||||
self.assertEqual(f.read(), expected_content)
|
||||
|
||||
def test_first_time_refresh(self) -> None:
|
||||
# Metadata dir contains only the mandatory initial root.json
|
||||
metadata_files = os.listdir(self.metadata_dir)
|
||||
self.assertListEqual(metadata_files, ["root.json"])
|
||||
self._assert_files_exist(["root"])
|
||||
|
||||
# Add one more root version to repository so that
|
||||
# refresh() updates from local trusted root (v1) to
|
||||
|
|
@ -82,19 +94,19 @@ def test_first_time_refresh(self) -> None:
|
|||
|
||||
self._run_refresh()
|
||||
|
||||
# Top-level metadata can be found in metadata dir
|
||||
metadata_files_after_refresh = os.listdir(self.metadata_dir)
|
||||
metadata_files_after_refresh.sort()
|
||||
self.assertListEqual(
|
||||
metadata_files_after_refresh,
|
||||
["root.json", "snapshot.json", "targets.json", "timestamp.json"],
|
||||
)
|
||||
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
|
||||
for role in TOP_LEVEL_ROLE_NAMES:
|
||||
version = 2 if role == "root" else None
|
||||
self._assert_content_equals(role, version)
|
||||
|
||||
def test_trusted_root_missing(self) -> None:
|
||||
os.remove(os.path.join(self.metadata_dir, "root.json"))
|
||||
with self.assertRaises(OSError):
|
||||
self._run_refresh()
|
||||
|
||||
# Metadata dir is empty
|
||||
self.assertFalse(os.listdir(self.metadata_dir))
|
||||
|
||||
def test_trusted_root_expired(self) -> None:
|
||||
# Create an expired root version
|
||||
self.sim.root.expires = self.past_datetime
|
||||
|
|
@ -107,8 +119,8 @@ def test_trusted_root_expired(self) -> None:
|
|||
with self.assertRaises(ExpiredMetadataError):
|
||||
updater.refresh()
|
||||
|
||||
with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f:
|
||||
self.assertEqual(f.read(), self.sim.signed_roots[-1])
|
||||
self._assert_files_exist(["root"])
|
||||
self._assert_content_equals("root", 2)
|
||||
|
||||
# Local root metadata can be loaded even if expired
|
||||
updater = self._init_updater()
|
||||
|
|
@ -117,11 +129,11 @@ def test_trusted_root_expired(self) -> None:
|
|||
self.sim.root.expires = self.sim.safe_expiry
|
||||
self.sim.root.version += 1
|
||||
self.sim.publish_root()
|
||||
|
||||
updater.refresh()
|
||||
|
||||
with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f:
|
||||
self.assertEqual(f.read(), self.sim.signed_roots[-1])
|
||||
# Root is successfully updated to latest version
|
||||
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
|
||||
self._assert_content_equals("root", 3)
|
||||
|
||||
def test_trusted_root_unsigned(self) -> None:
|
||||
# Local trusted root is not signed
|
||||
|
|
@ -133,6 +145,11 @@ def test_trusted_root_unsigned(self) -> None:
|
|||
with self.assertRaises(UnsignedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
# The update failed, no changes in metadata
|
||||
self._assert_files_exist(["root"])
|
||||
md_root_after = Metadata.from_file(root_path)
|
||||
self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes())
|
||||
|
||||
def test_max_root_rotations(self) -> None:
|
||||
# Root must stop looking for new versions after Y number of
|
||||
# intermediate files were downloaded.
|
||||
|
|
@ -175,6 +192,11 @@ def test_intermediate_root_incorrectly_signed(self) -> None:
|
|||
with self.assertRaises(UnsignedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
# The update failed, latest root version is v1
|
||||
self._assert_files_exist(["root"])
|
||||
self._assert_content_equals("root", 1)
|
||||
|
||||
|
||||
def test_intermediate_root_expired(self) -> None:
|
||||
# The expiration of the new (intermediate) root metadata file
|
||||
# does not matter yet
|
||||
|
|
@ -190,20 +212,24 @@ def test_intermediate_root_expired(self) -> None:
|
|||
self.sim.publish_root()
|
||||
|
||||
self._run_refresh()
|
||||
md_root = Metadata.from_file(
|
||||
os.path.join(self.metadata_dir, "root.json")
|
||||
)
|
||||
self.assertEqual(md_root.signed.version, self.sim.root.version)
|
||||
|
||||
# Successfully updated to root v3
|
||||
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
|
||||
self._assert_content_equals("root", 3)
|
||||
|
||||
def test_final_root_incorrectly_signed(self) -> None:
|
||||
# Check for an arbitrary software attack
|
||||
self.sim.root.version += 1
|
||||
self.sim.root.version += 1 # root v2
|
||||
self.sim.signers["root"].clear()
|
||||
self.sim.publish_root()
|
||||
|
||||
with self.assertRaises(UnsignedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
# The update failed, latest root version is v1
|
||||
self._assert_files_exist(["root"])
|
||||
self._assert_content_equals("root", 1)
|
||||
|
||||
def test_new_root_same_version(self) -> None:
|
||||
# Check for a rollback_attack
|
||||
# Repository serves a root file with the same version as previous
|
||||
|
|
@ -211,6 +237,10 @@ def test_new_root_same_version(self) -> None:
|
|||
with self.assertRaises(ReplayedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
# The update failed, latest root version is v1
|
||||
self._assert_files_exist(["root"])
|
||||
self._assert_content_equals("root", 1)
|
||||
|
||||
def test_new_root_nonconsecutive_version(self) -> None:
|
||||
# Repository serves non-consecutive root version
|
||||
self.sim.root.version += 2
|
||||
|
|
@ -218,6 +248,10 @@ def test_new_root_nonconsecutive_version(self) -> None:
|
|||
with self.assertRaises(ReplayedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
# The update failed, latest root version is v1
|
||||
self._assert_files_exist(["root"])
|
||||
self._assert_content_equals("root", 1)
|
||||
|
||||
def test_final_root_expired(self) -> None:
|
||||
# Check for a freeze attack
|
||||
# Final root is expired
|
||||
|
|
@ -228,12 +262,18 @@ def test_final_root_expired(self) -> None:
|
|||
with self.assertRaises(ExpiredMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
# The update failed but final root is persisted on the file system
|
||||
self._assert_files_exist(["root"])
|
||||
self._assert_content_equals("root", 2)
|
||||
|
||||
def test_new_timestamp_unsigned(self) -> None:
|
||||
# Check for an arbitrary software attack
|
||||
self.sim.signers["timestamp"].clear()
|
||||
with self.assertRaises(UnsignedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
self._assert_files_exist(["root"])
|
||||
|
||||
def test_new_timestamp_version_rollback(self) -> None:
|
||||
# Check for a rollback attack
|
||||
self.sim.timestamp.version = 2
|
||||
|
|
@ -243,19 +283,25 @@ def test_new_timestamp_version_rollback(self) -> None:
|
|||
with self.assertRaises(ReplayedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json"))
|
||||
self.assertEqual(md_timestamp.signed.version, 2)
|
||||
|
||||
def test_new_timestamp_snapshot_rollback(self) -> None:
|
||||
# Check for a rollback attack.
|
||||
self.sim.snapshot.version = 2
|
||||
self.sim.update_timestamp()
|
||||
self.sim.update_timestamp() # timestamp v2
|
||||
self._run_refresh()
|
||||
|
||||
# Snapshot meta version is smaller than previous
|
||||
self.sim.timestamp.snapshot_meta.version = 1
|
||||
self.sim.timestamp.version += 1
|
||||
self.sim.timestamp.version += 1 # timestamp v3
|
||||
|
||||
with self.assertRaises(ReplayedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json"))
|
||||
self.assertEqual(md_timestamp.signed.version, 2)
|
||||
|
||||
def test_new_timestamp_expired(self) -> None:
|
||||
# Check for a freeze attack
|
||||
self.sim.timestamp.expires = self.past_datetime
|
||||
|
|
@ -264,6 +310,8 @@ def test_new_timestamp_expired(self) -> None:
|
|||
with self.assertRaises(ExpiredMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
self._assert_files_exist(["root"])
|
||||
|
||||
def test_new_snapshot_hash_mismatch(self) -> None:
|
||||
# Check against timestamp role’s snapshot hash
|
||||
|
||||
|
|
@ -283,12 +331,20 @@ def test_new_snapshot_hash_mismatch(self) -> None:
|
|||
with self.assertRaises(RepositoryError):
|
||||
self._run_refresh()
|
||||
|
||||
md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json"))
|
||||
self.assertEqual(md_timestamp.signed.version, 3)
|
||||
|
||||
md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json"))
|
||||
self.assertEqual(md_snapshot.signed.version, 1)
|
||||
|
||||
def test_new_snapshot_unsigned(self) -> None:
|
||||
# Check for an arbitrary software attack
|
||||
self.sim.signers["snapshot"].clear()
|
||||
with self.assertRaises(UnsignedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
self._assert_files_exist(["root", "timestamp"])
|
||||
|
||||
# TODO: RepositorySimulator works always with consistent snapshot
|
||||
# enabled which forces the client to look for the snapshot version
|
||||
# written in timestamp (which leads to "Unknown snapshot version").
|
||||
|
|
@ -315,6 +371,9 @@ def test_new_snapshot_version_rollback(self) -> None:
|
|||
with self.assertRaises(ReplayedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json"))
|
||||
self.assertEqual(md_snapshot.signed.version, 2)
|
||||
|
||||
def test_new_snapshot_expired(self) -> None:
|
||||
# Check for a freeze attack
|
||||
self.sim.snapshot.expires = self.past_datetime
|
||||
|
|
@ -323,6 +382,9 @@ def test_new_snapshot_expired(self) -> None:
|
|||
with self.assertRaises(ExpiredMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
self._assert_files_exist(["root", "timestamp"])
|
||||
|
||||
|
||||
def test_new_targets_hash_mismatch(self) -> None:
|
||||
# Check against snapshot role’s targets hashes
|
||||
|
||||
|
|
@ -341,12 +403,20 @@ def test_new_targets_hash_mismatch(self) -> None:
|
|||
with self.assertRaises(RepositoryError):
|
||||
self._run_refresh()
|
||||
|
||||
md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json"))
|
||||
self.assertEqual(md_snapshot.signed.version, 3)
|
||||
|
||||
md_targets = Metadata.from_file(os.path.join(self.metadata_dir, "targets.json"))
|
||||
self.assertEqual(md_targets.signed.version, 1)
|
||||
|
||||
def test_new_targets_unsigned(self) -> None:
|
||||
# Check for an arbitrary software attack
|
||||
self.sim.signers["targets"].clear()
|
||||
with self.assertRaises(UnsignedMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
self._assert_files_exist(["root", "timestamp", "snapshot"])
|
||||
|
||||
# TODO: RepositorySimulator works always with consistent snapshot
|
||||
# enabled which forces the client to look for the targets version
|
||||
# written in snapshot (which leads to "Unknown targets version").
|
||||
|
|
@ -366,6 +436,8 @@ def test_new_targets_expired(self) -> None:
|
|||
with self.assertRaises(ExpiredMetadataError):
|
||||
self._run_refresh()
|
||||
|
||||
self._assert_files_exist(["root", "timestamp", "snapshot"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue