python-tuf/tests/test_updater_consistent_snapshot.py

266 lines
9.6 KiB
Python
Raw Permalink Normal View History

# Copyright 2021, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Test ngclient Updater toggling consistent snapshot"""
from __future__ import annotations
import os
import sys
import tempfile
import unittest
from typing import TYPE_CHECKING, Any
from tests import utils
from tests.repository_simulator import RepositorySimulator
from tuf.api.metadata import (
SPECIFICATION_VERSION,
TOP_LEVEL_ROLE_NAMES,
DelegatedRole,
TargetFile,
Targets,
)
from tuf.ngclient import Updater
if TYPE_CHECKING:
from collections.abc import Iterable
class TestConsistentSnapshot(unittest.TestCase):
"""Test different combinations of 'consistent_snapshot' and
'prefix_targets_with_hash' and verify that the correct URLs
are formed for each combination"""
# set dump_dir to trigger repository state dumps
dump_dir: str | None = None
def setUp(self) -> None:
self.subtest_count = 0
self.temp_dir = tempfile.TemporaryDirectory()
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
os.mkdir(self.metadata_dir)
os.mkdir(self.targets_dir)
self.sim: RepositorySimulator
def tearDown(self) -> None:
self.temp_dir.cleanup()
def setup_subtest(
self, consistent_snapshot: bool, prefix_targets: bool = True
) -> None:
self.sim = self._init_repo(consistent_snapshot, prefix_targets)
self.subtest_count += 1
if self.dump_dir is not None:
# create subtest dumpdir
name = f"{self.id().split('.')[-1]}-{self.subtest_count}"
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)
def teardown_subtest(self) -> None:
if self.dump_dir is not None:
self.sim.write()
utils.cleanup_metadata_dir(self.metadata_dir)
def _init_repo(
self, consistent_snapshot: bool, prefix_targets: bool = True
) -> RepositorySimulator:
"""Create a new RepositorySimulator instance"""
sim = RepositorySimulator()
sim.root.consistent_snapshot = consistent_snapshot
sim.root.version += 1
sim.publish_root()
sim.prefix_targets_with_hash = prefix_targets
return sim
def _init_updater(self) -> Updater:
"""Create a new Updater instance"""
return Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
bootstrap=self.sim.signed_roots[-1],
)
def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None:
"""Assert that local metadata files exist for 'roles'"""
local_metadata_files = os.listdir(self.metadata_dir)
for role in roles:
self.assertIn(f"{role}.json", local_metadata_files)
def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None:
"""Assert that local files with 'filenames' exist"""
local_target_files = os.listdir(self.targets_dir)
for filename in filenames:
self.assertIn(filename, local_target_files)
top_level_roles_data = {
"consistent_snaphot disabled": {
"consistent_snapshot": False,
"calls": [
("root", 3),
("timestamp", None),
("snapshot", None),
("targets", None),
],
},
"consistent_snaphot enabled": {
"consistent_snapshot": True,
"calls": [
("root", 3),
("timestamp", None),
("snapshot", 1),
("targets", 1),
],
},
}
@utils.run_sub_tests_with_dataset(top_level_roles_data)
def test_top_level_roles_update(
self, test_case_data: dict[str, Any]
) -> None:
# Test if the client fetches and stores metadata files with the
# correct version prefix, depending on 'consistent_snapshot' config
try:
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
exp_calls: list[Any] = test_case_data["calls"]
self.setup_subtest(consistent_snapshot)
updater = self._init_updater()
# cleanup fetch tracker metadata
self.sim.fetch_tracker.metadata.clear()
updater.refresh()
# metadata files are fetched with the expected version (or None)
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
# metadata files are always persisted without a version prefix
self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES)
finally:
self.teardown_subtest()
delegated_roles_data = {
"consistent_snaphot disabled": {
"consistent_snapshot": False,
"expected_version": None,
},
"consistent_snaphot enabled": {
"consistent_snapshot": True,
"expected_version": 1,
},
}
@utils.run_sub_tests_with_dataset(delegated_roles_data)
def test_delegated_roles_update(
self, test_case_data: dict[str, Any]
) -> None:
# Test if the client fetches and stores delegated metadata files with
# the correct version prefix, depending on 'consistent_snapshot' config
try:
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
exp_version: int | None = test_case_data["expected_version"]
rolenames = ["role1", "..", "."]
exp_calls = [(role, exp_version) for role in rolenames]
self.setup_subtest(consistent_snapshot)
# Add new delegated targets
spec_version = ".".join(SPECIFICATION_VERSION)
for role in rolenames:
delegated_role = DelegatedRole(role, [], 1, False, ["*"], None)
targets = Targets(
1, spec_version, self.sim.safe_expiry, {}, None
)
self.sim.add_delegation("targets", delegated_role, targets)
self.sim.update_snapshot()
updater = self._init_updater()
updater.refresh()
# cleanup fetch tracker metadata
self.sim.fetch_tracker.metadata.clear()
# trigger updater to fetch the delegated metadata
updater.get_targetinfo("anything")
# metadata files are fetched with the expected version (or None)
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
# metadata files are always persisted without a version prefix
self._assert_metadata_files_exist(rolenames)
finally:
self.teardown_subtest()
targets_download_data = {
"consistent_snaphot disabled": {
"consistent_snapshot": False,
"prefix_targets": True,
"hash_algo": None,
"targetpaths": ["file", "file.txt", "..file.ext", "f.le"],
},
"consistent_snaphot enabled without prefixed targets": {
"consistent_snapshot": True,
"prefix_targets": False,
"hash_algo": None,
"targetpaths": ["file", "file.txt", "..file.ext", "f.le"],
},
"consistent_snaphot enabled with prefixed targets": {
"consistent_snapshot": True,
"prefix_targets": True,
"hash_algo": "sha256",
"targetpaths": ["file", "file.txt", "..file.ext", "f.le"],
},
}
@utils.run_sub_tests_with_dataset(targets_download_data)
def test_download_targets(self, test_case_data: dict[str, Any]) -> None:
# Test if the client fetches and stores target files with
# the correct hash prefix, depending on 'consistent_snapshot'
# and 'prefix_targets_with_hash' config
try:
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
prefix_targets_with_hash: bool = test_case_data["prefix_targets"]
hash_algo: str | None = test_case_data["hash_algo"]
targetpaths: list[str] = test_case_data["targetpaths"]
self.setup_subtest(consistent_snapshot, prefix_targets_with_hash)
# Add targets to repository
for targetpath in targetpaths:
self.sim.targets.version += 1
self.sim.add_target("targets", b"content", targetpath)
self.sim.update_snapshot()
updater = self._init_updater()
updater.config.prefix_targets_with_hash = prefix_targets_with_hash
updater.refresh()
for path in targetpaths:
info = updater.get_targetinfo(path)
assert isinstance(info, TargetFile)
updater.download_target(info)
# target files are always persisted without hash prefix
self._assert_targets_files_exist([info.path])
# files are fetched with the expected hash prefix (or None)
exp_calls = [
(path, None if not hash_algo else info.hashes[hash_algo])
]
self.assertListEqual(self.sim.fetch_tracker.targets, exp_calls)
self.sim.fetch_tracker.targets.clear()
finally:
self.teardown_subtest()
if __name__ == "__main__":
if "--dump" in sys.argv:
TestConsistentSnapshot.dump_dir = tempfile.mkdtemp()
print(
f"Repository Simulator dumps in {TestConsistentSnapshot.dump_dir}"
)
sys.argv.remove("--dump")
utils.configure_test_logging(sys.argv)
unittest.main()