mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Add TestTargetFileSearch class
Extend test_updater_delegation_graphs.py with tests for targets metadata search. - create a new test class TestTargetFileSearch which creates a single repository and pefrorms multiple file searches in subtests. - group the common functionality in a base class TestDelegations. - extend the data classes to accomodate for target_files. Signed-off-by: Teodora Sechkova <tsechkova@vmware.com>
This commit is contained in:
parent
4fc2c19ba4
commit
6fa5d3ddd2
1 changed files with 137 additions and 52 deletions
|
|
@ -3,15 +3,15 @@
|
|||
# Copyright 2021, New York University and the TUF contributors
|
||||
# SPDX-License-Identifier: MIT OR Apache-2.0
|
||||
|
||||
"""Test updating delegated targets roles with various
|
||||
delegation hierarchies"""
|
||||
"""Test updating delegated targets roles and searching for
|
||||
target files with various delegation graphs"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from dataclasses import astuple, dataclass, field
|
||||
from typing import Iterable, List, Optional
|
||||
from typing import Iterable, List, Optional, Tuple
|
||||
|
||||
from tests import utils
|
||||
from tests.repository_simulator import RepositorySimulator
|
||||
|
|
@ -35,18 +35,25 @@ class TestDelegation:
|
|||
path_hash_prefixes: Optional[List[str]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestTarget:
|
||||
rolename: str
|
||||
content: bytes
|
||||
targetpath: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class DelegationsTestCase:
|
||||
"""Describes a delegations graph as a list of delegations
|
||||
and the expected order of traversal as 'visited_order'."""
|
||||
"""A delegations graph as lists of delegations and target files
|
||||
and the expected order of traversal as a list of role names."""
|
||||
|
||||
delegations: List[TestDelegation]
|
||||
visited_order: List[str]
|
||||
target_files: List[TestTarget] = field(default_factory=list)
|
||||
visited_order: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class TestDelegationsGraphs(unittest.TestCase):
|
||||
"""Test creating delegations graphs with different complexity
|
||||
and successfully updating the delegated roles metadata"""
|
||||
class TestDelegations(unittest.TestCase):
|
||||
"""Base class for delegation tests"""
|
||||
|
||||
# set dump_dir to trigger repository state dumps
|
||||
dump_dir: Optional[str] = None
|
||||
|
|
@ -59,70 +66,73 @@ def setUp(self) -> None:
|
|||
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
|
||||
os.mkdir(self.metadata_dir)
|
||||
os.mkdir(self.targets_dir)
|
||||
self.sim: RepositorySimulator
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def setup_subtest(
|
||||
self, delegations: List[TestDelegation]
|
||||
) -> RepositorySimulator:
|
||||
sim = self._init_repo(delegations)
|
||||
|
||||
def setup_subtest(self) -> None:
|
||||
self.subtest_count += 1
|
||||
if self.dump_dir is not None:
|
||||
# create subtest dumpdir
|
||||
name = f"{self.id().split('.')[-1]}-{self.subtest_count}"
|
||||
sim.dump_dir = os.path.join(self.dump_dir, name)
|
||||
os.mkdir(sim.dump_dir)
|
||||
self.sim.dump_dir = os.path.join(self.dump_dir, name)
|
||||
os.mkdir(self.sim.dump_dir)
|
||||
# dump the repo simulator metadata
|
||||
sim.write()
|
||||
|
||||
return sim
|
||||
self.sim.write()
|
||||
|
||||
def teardown_subtest(self) -> None:
|
||||
# clean up after each subtest
|
||||
utils.cleanup_dir(self.metadata_dir)
|
||||
|
||||
def _init_updater(self, sim: RepositorySimulator) -> Updater:
|
||||
def _init_repo(self, test_case: DelegationsTestCase) -> None:
|
||||
"""Create a new RepositorySimulator instance and
|
||||
populate it with delegations and target files"""
|
||||
|
||||
self.sim = RepositorySimulator()
|
||||
spec_version = ".".join(SPECIFICATION_VERSION)
|
||||
for d in test_case.delegations:
|
||||
if d.rolename in self.sim.md_delegates:
|
||||
targets = self.sim.md_delegates[d.rolename].signed
|
||||
else:
|
||||
targets = Targets(
|
||||
1, spec_version, self.sim.safe_expiry, {}, None
|
||||
)
|
||||
# unpack 'd' but skip "delegator"
|
||||
role = DelegatedRole(*astuple(d)[1:])
|
||||
self.sim.add_delegation(d.delegator, role, targets)
|
||||
|
||||
for target in test_case.target_files:
|
||||
self.sim.add_target(*astuple(target))
|
||||
|
||||
if test_case.target_files:
|
||||
self.sim.targets.version += 1
|
||||
self.sim.update_snapshot()
|
||||
|
||||
def _init_updater(self) -> Updater:
|
||||
"""Create a new Updater instance"""
|
||||
# Init trusted root for Updater
|
||||
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
|
||||
f.write(self.sim.signed_roots[0])
|
||||
|
||||
return Updater(
|
||||
self.metadata_dir,
|
||||
"https://example.com/metadata/",
|
||||
self.targets_dir,
|
||||
"https://example.com/targets/",
|
||||
sim,
|
||||
self.sim,
|
||||
)
|
||||
|
||||
def _init_repo(
|
||||
self, delegations: List[TestDelegation]
|
||||
) -> RepositorySimulator:
|
||||
"""Create a new RepositorySimulator instance with 'delegations'"""
|
||||
sim = RepositorySimulator()
|
||||
spec_version = ".".join(SPECIFICATION_VERSION)
|
||||
|
||||
for d in delegations:
|
||||
if d.rolename in sim.md_delegates:
|
||||
targets = sim.md_delegates[d.rolename].signed
|
||||
else:
|
||||
targets = Targets(1, spec_version, sim.safe_expiry, {}, None)
|
||||
|
||||
# unpack 'd' but skip "delegator"
|
||||
role = DelegatedRole(*astuple(d)[1:])
|
||||
sim.add_delegation(d.delegator, role, targets)
|
||||
sim.update_snapshot()
|
||||
|
||||
# Init trusted root for Updater
|
||||
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
|
||||
f.write(sim.signed_roots[0])
|
||||
|
||||
return sim
|
||||
|
||||
def _assert_files_exist(self, roles: Iterable[str]) -> None:
|
||||
"""Assert that local metadata files exist for 'roles'"""
|
||||
expected_files = sorted([f"{role}.json" for role in roles])
|
||||
local_metadata_files = sorted(os.listdir(self.metadata_dir))
|
||||
self.assertListEqual(local_metadata_files, expected_files)
|
||||
|
||||
|
||||
class TestDelegationsGraphs(TestDelegations):
|
||||
"""Test creating delegations graphs with different complexity
|
||||
and successfully updating the delegated roles metadata"""
|
||||
|
||||
graphs: utils.DataSet = {
|
||||
"basic delegation": DelegationsTestCase(
|
||||
delegations=[TestDelegation("targets", "A")],
|
||||
|
|
@ -248,13 +258,15 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
|
|||
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order]
|
||||
exp_calls = [(role, 1) for role in test_data.visited_order]
|
||||
|
||||
sim = self.setup_subtest(test_data.delegations)
|
||||
updater = self._init_updater(sim)
|
||||
self._init_repo(test_data)
|
||||
self.setup_subtest()
|
||||
|
||||
updater = self._init_updater()
|
||||
# restrict the max number of delegations to simplify the test
|
||||
updater.config.max_delegations = 4
|
||||
# Call explicitly refresh to simplify the expected_calls list
|
||||
updater.refresh()
|
||||
sim.fetch_tracker.metadata.clear()
|
||||
self.sim.fetch_tracker.metadata.clear()
|
||||
# Check that metadata dir contains only top-level roles
|
||||
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
|
||||
|
||||
|
|
@ -264,7 +276,80 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
|
|||
self.assertIsNone(targetfile)
|
||||
# Check that the delegated roles were visited in the expected
|
||||
# order and the corresponding metadata files were persisted
|
||||
self.assertListEqual(sim.fetch_tracker.metadata, exp_calls)
|
||||
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
||||
self._assert_files_exist(exp_files)
|
||||
finally:
|
||||
self.teardown_subtest()
|
||||
|
||||
|
||||
class TestTargetFileSearch(TestDelegations):
|
||||
"""
|
||||
Create a single repository with the following delegations:
|
||||
|
||||
targets
|
||||
*.doc, *md / \ release/*/*
|
||||
A B
|
||||
release/x/* / \ release/y/*.zip
|
||||
C D
|
||||
|
||||
Test that Updater successfully finds the target files metadata,
|
||||
traversing the delegations as expected.
|
||||
"""
|
||||
|
||||
delegations_tree = DelegationsTestCase(
|
||||
delegations=[
|
||||
TestDelegation("targets", "A", paths=["*.doc", "*.md"]),
|
||||
TestDelegation("targets", "B", paths=["releases/*/*"]),
|
||||
TestDelegation("B", "C", paths=["releases/x/*"]),
|
||||
TestDelegation("B", "D", paths=["releases/y/*.zip"]),
|
||||
],
|
||||
target_files=[
|
||||
TestTarget("targets", b"targetfile content", "targetfile"),
|
||||
TestTarget("A", b"README by A", "README.md"),
|
||||
TestTarget("C", b"x release by C", "releases/x/x_v1"),
|
||||
TestTarget("D", b"y release by D", "releases/y/y_v1.zip"),
|
||||
TestTarget("D", b"z release by D", "releases/z/z_v1.zip"),
|
||||
],
|
||||
)
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self._init_repo(self.delegations_tree)
|
||||
|
||||
# fmt: off
|
||||
targets: utils.DataSet = {
|
||||
"target found, no delegations": ("targetfile", True, []),
|
||||
"targetpath matches wildcard": ("README.md", True, ["A"]),
|
||||
"targetpath with separators x": ("releases/x/x_v1", True, ["B", "C"]),
|
||||
"targetpath with separators y": ("releases/y/y_v1.zip", True, ["B", "D"]),
|
||||
"target exists, path is not delegated": ("releases/z/z_v1.zip", False, ["B"]),
|
||||
}
|
||||
# fmt: on
|
||||
|
||||
@utils.run_sub_tests_with_dataset(targets)
|
||||
def test_targetfile_search(
|
||||
self, test_data: Tuple[str, bool, List[str]]
|
||||
) -> None:
|
||||
try:
|
||||
self.setup_subtest()
|
||||
targetpath, found, visited_order = test_data
|
||||
exp_files = [*TOP_LEVEL_ROLE_NAMES, *visited_order]
|
||||
exp_calls = [(role, 1) for role in visited_order]
|
||||
updater = self._init_updater()
|
||||
# Call explicitly refresh to simplify the expected_calls list
|
||||
updater.refresh()
|
||||
self.sim.fetch_tracker.metadata.clear()
|
||||
target = updater.get_targetinfo(targetpath)
|
||||
if target is not None:
|
||||
# Confirm that the expected TargetFile is found
|
||||
self.assertTrue(found)
|
||||
exp_target = self.sim.target_files[targetpath].target_file
|
||||
self.assertDictEqual(target.to_dict(), exp_target.to_dict())
|
||||
else:
|
||||
self.assertFalse(found)
|
||||
# Check that the delegated roles were visited in the expected
|
||||
# order and the corresponding metadata files were persisted
|
||||
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
||||
self._assert_files_exist(exp_files)
|
||||
finally:
|
||||
self.teardown_subtest()
|
||||
|
|
@ -272,8 +357,8 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
|
|||
|
||||
if __name__ == "__main__":
|
||||
if "--dump" in sys.argv:
|
||||
TestDelegationsGraphs.dump_dir = tempfile.mkdtemp()
|
||||
print(f"Repository Simulator dumps in {TestDelegationsGraphs.dump_dir}")
|
||||
TestDelegations.dump_dir = tempfile.mkdtemp()
|
||||
print(f"Repository Simulator dumps in {TestDelegations.dump_dir}")
|
||||
sys.argv.remove("--dump")
|
||||
|
||||
utils.configure_test_logging(sys.argv)
|
||||
|
|
|
|||
Loading…
Reference in a new issue