2021-11-12 15:26:42 +00:00
|
|
|
# Copyright 2021, New York University and the TUF contributors
|
|
|
|
|
# SPDX-License-Identifier: MIT OR Apache-2.0
|
|
|
|
|
|
|
|
|
|
"""Test ngclient Updater toggling consistent snapshot"""
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2021-11-12 15:26:42 +00:00
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
import tempfile
|
|
|
|
|
import unittest
|
2024-11-29 10:29:32 +00:00
|
|
|
from typing import TYPE_CHECKING, Any
|
2021-11-12 15:26:42 +00:00
|
|
|
|
|
|
|
|
from tests import utils
|
|
|
|
|
from tests.repository_simulator import RepositorySimulator
|
|
|
|
|
from tuf.api.metadata import (
|
|
|
|
|
SPECIFICATION_VERSION,
|
|
|
|
|
TOP_LEVEL_ROLE_NAMES,
|
2021-11-22 14:28:43 +00:00
|
|
|
DelegatedRole,
|
2021-12-03 16:47:31 +00:00
|
|
|
TargetFile,
|
2021-11-12 15:26:42 +00:00
|
|
|
Targets,
|
|
|
|
|
)
|
|
|
|
|
from tuf.ngclient import Updater
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from collections.abc import Iterable
|
|
|
|
|
|
2021-11-12 15:26:42 +00:00
|
|
|
|
|
|
|
|
class TestConsistentSnapshot(unittest.TestCase):
|
|
|
|
|
"""Test different combinations of 'consistent_snapshot' and
|
|
|
|
|
'prefix_targets_with_hash' and verify that the correct URLs
|
|
|
|
|
are formed for each combination"""
|
|
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
# set dump_dir to trigger repository state dumps
|
2024-11-29 10:29:32 +00:00
|
|
|
dump_dir: str | None = None
|
2021-12-01 15:52:21 +00:00
|
|
|
|
2021-11-12 15:26:42 +00:00
|
|
|
def setUp(self) -> None:
|
2021-12-01 15:52:21 +00:00
|
|
|
self.subtest_count = 0
|
2021-11-12 15:26:42 +00:00
|
|
|
self.temp_dir = tempfile.TemporaryDirectory()
|
|
|
|
|
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
|
|
|
|
|
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
|
|
|
|
|
os.mkdir(self.metadata_dir)
|
|
|
|
|
os.mkdir(self.targets_dir)
|
2021-12-10 14:09:47 +00:00
|
|
|
self.sim: RepositorySimulator
|
2021-11-12 15:26:42 +00:00
|
|
|
|
|
|
|
|
def tearDown(self) -> None:
|
|
|
|
|
self.temp_dir.cleanup()
|
|
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
def setup_subtest(
|
|
|
|
|
self, consistent_snapshot: bool, prefix_targets: bool = True
|
|
|
|
|
) -> None:
|
|
|
|
|
self.sim = self._init_repo(consistent_snapshot, prefix_targets)
|
|
|
|
|
|
|
|
|
|
self.subtest_count += 1
|
|
|
|
|
if self.dump_dir is not None:
|
|
|
|
|
# create subtest dumpdir
|
|
|
|
|
name = f"{self.id().split('.')[-1]}-{self.subtest_count}"
|
|
|
|
|
self.sim.dump_dir = os.path.join(self.dump_dir, name)
|
|
|
|
|
os.mkdir(self.sim.dump_dir)
|
|
|
|
|
|
2021-12-10 14:09:47 +00:00
|
|
|
def teardown_subtest(self) -> None:
|
2021-12-01 15:52:21 +00:00
|
|
|
if self.dump_dir is not None:
|
|
|
|
|
self.sim.write()
|
|
|
|
|
|
2024-12-31 10:49:30 +00:00
|
|
|
utils.cleanup_metadata_dir(self.metadata_dir)
|
2021-12-01 15:52:21 +00:00
|
|
|
|
2021-11-12 15:26:42 +00:00
|
|
|
def _init_repo(
|
|
|
|
|
self, consistent_snapshot: bool, prefix_targets: bool = True
|
|
|
|
|
) -> RepositorySimulator:
|
|
|
|
|
"""Create a new RepositorySimulator instance"""
|
|
|
|
|
sim = RepositorySimulator()
|
|
|
|
|
sim.root.consistent_snapshot = consistent_snapshot
|
|
|
|
|
sim.root.version += 1
|
|
|
|
|
sim.publish_root()
|
|
|
|
|
sim.prefix_targets_with_hash = prefix_targets
|
|
|
|
|
|
|
|
|
|
return sim
|
|
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
def _init_updater(self) -> Updater:
|
2021-11-12 15:26:42 +00:00
|
|
|
"""Create a new Updater instance"""
|
|
|
|
|
return Updater(
|
|
|
|
|
self.metadata_dir,
|
|
|
|
|
"https://example.com/metadata/",
|
|
|
|
|
self.targets_dir,
|
|
|
|
|
"https://example.com/targets/",
|
2021-12-01 15:52:21 +00:00
|
|
|
self.sim,
|
2026-01-25 11:50:37 +00:00
|
|
|
bootstrap=self.sim.signed_roots[-1],
|
2021-11-12 15:26:42 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None:
|
|
|
|
|
"""Assert that local metadata files exist for 'roles'"""
|
|
|
|
|
local_metadata_files = os.listdir(self.metadata_dir)
|
|
|
|
|
for role in roles:
|
|
|
|
|
self.assertIn(f"{role}.json", local_metadata_files)
|
|
|
|
|
|
|
|
|
|
def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None:
|
|
|
|
|
"""Assert that local files with 'filenames' exist"""
|
|
|
|
|
local_target_files = os.listdir(self.targets_dir)
|
|
|
|
|
for filename in filenames:
|
|
|
|
|
self.assertIn(filename, local_target_files)
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
top_level_roles_data = {
|
2021-11-12 15:26:42 +00:00
|
|
|
"consistent_snaphot disabled": {
|
|
|
|
|
"consistent_snapshot": False,
|
|
|
|
|
"calls": [
|
2021-12-03 07:59:07 +00:00
|
|
|
("root", 3),
|
|
|
|
|
("timestamp", None),
|
|
|
|
|
("snapshot", None),
|
|
|
|
|
("targets", None),
|
2021-11-12 15:26:42 +00:00
|
|
|
],
|
|
|
|
|
},
|
|
|
|
|
"consistent_snaphot enabled": {
|
|
|
|
|
"consistent_snapshot": True,
|
|
|
|
|
"calls": [
|
2021-12-03 07:59:07 +00:00
|
|
|
("root", 3),
|
|
|
|
|
("timestamp", None),
|
|
|
|
|
("snapshot", 1),
|
|
|
|
|
("targets", 1),
|
2021-11-12 15:26:42 +00:00
|
|
|
],
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@utils.run_sub_tests_with_dataset(top_level_roles_data)
|
2021-12-03 16:47:31 +00:00
|
|
|
def test_top_level_roles_update(
|
2024-11-04 04:21:23 +00:00
|
|
|
self, test_case_data: dict[str, Any]
|
2021-12-03 16:47:31 +00:00
|
|
|
) -> None:
|
2021-11-12 15:26:42 +00:00
|
|
|
# Test if the client fetches and stores metadata files with the
|
|
|
|
|
# correct version prefix, depending on 'consistent_snapshot' config
|
2021-12-01 12:16:49 +00:00
|
|
|
try:
|
|
|
|
|
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
2024-11-04 04:21:23 +00:00
|
|
|
exp_calls: list[Any] = test_case_data["calls"]
|
2021-11-12 15:26:42 +00:00
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
self.setup_subtest(consistent_snapshot)
|
|
|
|
|
updater = self._init_updater()
|
2021-11-12 15:26:42 +00:00
|
|
|
|
2021-12-01 12:16:49 +00:00
|
|
|
# cleanup fetch tracker metadata
|
2021-12-01 15:52:21 +00:00
|
|
|
self.sim.fetch_tracker.metadata.clear()
|
2021-12-01 12:16:49 +00:00
|
|
|
updater.refresh()
|
2021-11-12 15:26:42 +00:00
|
|
|
|
2021-12-01 12:16:49 +00:00
|
|
|
# metadata files are fetched with the expected version (or None)
|
2021-12-01 15:52:21 +00:00
|
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
2021-12-01 12:16:49 +00:00
|
|
|
# metadata files are always persisted without a version prefix
|
|
|
|
|
self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES)
|
|
|
|
|
finally:
|
2021-12-01 15:52:21 +00:00
|
|
|
self.teardown_subtest()
|
2021-11-12 15:26:42 +00:00
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
delegated_roles_data = {
|
2021-11-12 15:26:42 +00:00
|
|
|
"consistent_snaphot disabled": {
|
|
|
|
|
"consistent_snapshot": False,
|
|
|
|
|
"expected_version": None,
|
|
|
|
|
},
|
|
|
|
|
"consistent_snaphot enabled": {
|
|
|
|
|
"consistent_snapshot": True,
|
|
|
|
|
"expected_version": 1,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@utils.run_sub_tests_with_dataset(delegated_roles_data)
|
2021-12-03 16:47:31 +00:00
|
|
|
def test_delegated_roles_update(
|
2024-11-04 04:21:23 +00:00
|
|
|
self, test_case_data: dict[str, Any]
|
2021-12-03 16:47:31 +00:00
|
|
|
) -> None:
|
2021-11-12 15:26:42 +00:00
|
|
|
# Test if the client fetches and stores delegated metadata files with
|
|
|
|
|
# the correct version prefix, depending on 'consistent_snapshot' config
|
2021-12-01 12:16:49 +00:00
|
|
|
try:
|
|
|
|
|
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
2024-11-29 10:29:32 +00:00
|
|
|
exp_version: int | None = test_case_data["expected_version"]
|
2021-12-01 12:16:49 +00:00
|
|
|
rolenames = ["role1", "..", "."]
|
|
|
|
|
exp_calls = [(role, exp_version) for role in rolenames]
|
|
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
self.setup_subtest(consistent_snapshot)
|
2021-12-01 12:16:49 +00:00
|
|
|
# Add new delegated targets
|
|
|
|
|
spec_version = ".".join(SPECIFICATION_VERSION)
|
|
|
|
|
for role in rolenames:
|
|
|
|
|
delegated_role = DelegatedRole(role, [], 1, False, ["*"], None)
|
2021-12-01 15:52:21 +00:00
|
|
|
targets = Targets(
|
|
|
|
|
1, spec_version, self.sim.safe_expiry, {}, None
|
|
|
|
|
)
|
|
|
|
|
self.sim.add_delegation("targets", delegated_role, targets)
|
|
|
|
|
self.sim.update_snapshot()
|
|
|
|
|
updater = self._init_updater()
|
2021-12-01 12:16:49 +00:00
|
|
|
updater.refresh()
|
|
|
|
|
|
|
|
|
|
# cleanup fetch tracker metadata
|
2021-12-01 15:52:21 +00:00
|
|
|
self.sim.fetch_tracker.metadata.clear()
|
2021-12-01 12:16:49 +00:00
|
|
|
# trigger updater to fetch the delegated metadata
|
|
|
|
|
updater.get_targetinfo("anything")
|
|
|
|
|
# metadata files are fetched with the expected version (or None)
|
2021-12-01 15:52:21 +00:00
|
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
2021-12-01 12:16:49 +00:00
|
|
|
# metadata files are always persisted without a version prefix
|
|
|
|
|
self._assert_metadata_files_exist(rolenames)
|
|
|
|
|
finally:
|
2021-12-01 15:52:21 +00:00
|
|
|
self.teardown_subtest()
|
2021-11-12 15:26:42 +00:00
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
targets_download_data = {
|
2021-11-12 15:26:42 +00:00
|
|
|
"consistent_snaphot disabled": {
|
|
|
|
|
"consistent_snapshot": False,
|
|
|
|
|
"prefix_targets": True,
|
|
|
|
|
"hash_algo": None,
|
2021-12-03 07:59:07 +00:00
|
|
|
"targetpaths": ["file", "file.txt", "..file.ext", "f.le"],
|
2021-11-12 15:26:42 +00:00
|
|
|
},
|
|
|
|
|
"consistent_snaphot enabled without prefixed targets": {
|
|
|
|
|
"consistent_snapshot": True,
|
|
|
|
|
"prefix_targets": False,
|
|
|
|
|
"hash_algo": None,
|
2021-12-03 07:59:07 +00:00
|
|
|
"targetpaths": ["file", "file.txt", "..file.ext", "f.le"],
|
2021-11-12 15:26:42 +00:00
|
|
|
},
|
|
|
|
|
"consistent_snaphot enabled with prefixed targets": {
|
|
|
|
|
"consistent_snapshot": True,
|
|
|
|
|
"prefix_targets": True,
|
|
|
|
|
"hash_algo": "sha256",
|
2021-12-03 07:59:07 +00:00
|
|
|
"targetpaths": ["file", "file.txt", "..file.ext", "f.le"],
|
2021-11-12 15:26:42 +00:00
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@utils.run_sub_tests_with_dataset(targets_download_data)
|
2024-11-04 04:21:23 +00:00
|
|
|
def test_download_targets(self, test_case_data: dict[str, Any]) -> None:
|
2021-11-12 15:26:42 +00:00
|
|
|
# Test if the client fetches and stores target files with
|
|
|
|
|
# the correct hash prefix, depending on 'consistent_snapshot'
|
|
|
|
|
# and 'prefix_targets_with_hash' config
|
2021-12-01 12:16:49 +00:00
|
|
|
try:
|
|
|
|
|
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
|
|
|
|
prefix_targets_with_hash: bool = test_case_data["prefix_targets"]
|
2024-11-29 10:29:32 +00:00
|
|
|
hash_algo: str | None = test_case_data["hash_algo"]
|
2024-11-04 04:21:23 +00:00
|
|
|
targetpaths: list[str] = test_case_data["targetpaths"]
|
2021-12-01 12:16:49 +00:00
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
self.setup_subtest(consistent_snapshot, prefix_targets_with_hash)
|
2021-12-01 12:16:49 +00:00
|
|
|
# Add targets to repository
|
|
|
|
|
for targetpath in targetpaths:
|
2021-12-01 15:52:21 +00:00
|
|
|
self.sim.targets.version += 1
|
|
|
|
|
self.sim.add_target("targets", b"content", targetpath)
|
|
|
|
|
self.sim.update_snapshot()
|
2021-12-01 12:16:49 +00:00
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
updater = self._init_updater()
|
2021-12-01 12:16:49 +00:00
|
|
|
updater.config.prefix_targets_with_hash = prefix_targets_with_hash
|
|
|
|
|
updater.refresh()
|
|
|
|
|
|
|
|
|
|
for path in targetpaths:
|
|
|
|
|
info = updater.get_targetinfo(path)
|
|
|
|
|
assert isinstance(info, TargetFile)
|
|
|
|
|
updater.download_target(info)
|
|
|
|
|
|
|
|
|
|
# target files are always persisted without hash prefix
|
|
|
|
|
self._assert_targets_files_exist([info.path])
|
|
|
|
|
|
|
|
|
|
# files are fetched with the expected hash prefix (or None)
|
2021-12-01 15:52:21 +00:00
|
|
|
exp_calls = [
|
2021-12-01 12:16:49 +00:00
|
|
|
(path, None if not hash_algo else info.hashes[hash_algo])
|
|
|
|
|
]
|
|
|
|
|
|
2021-12-01 15:52:21 +00:00
|
|
|
self.assertListEqual(self.sim.fetch_tracker.targets, exp_calls)
|
|
|
|
|
self.sim.fetch_tracker.targets.clear()
|
2021-12-01 12:16:49 +00:00
|
|
|
finally:
|
2021-12-01 15:52:21 +00:00
|
|
|
self.teardown_subtest()
|
2021-11-12 15:26:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2021-12-01 15:52:21 +00:00
|
|
|
if "--dump" in sys.argv:
|
|
|
|
|
TestConsistentSnapshot.dump_dir = tempfile.mkdtemp()
|
|
|
|
|
print(
|
|
|
|
|
f"Repository Simulator dumps in {TestConsistentSnapshot.dump_dir}"
|
|
|
|
|
)
|
|
|
|
|
sys.argv.remove("--dump")
|
2021-11-12 15:26:42 +00:00
|
|
|
|
|
|
|
|
utils.configure_test_logging(sys.argv)
|
|
|
|
|
unittest.main()
|