From 6fa5d3ddd2fe30d587ea3a8e88e5efe5cd574345 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Tue, 7 Dec 2021 14:15:52 +0200 Subject: [PATCH] Add TestTargetFileSearch class Extend test_updater_delegation_graphs.py with tests for targets metadata search. - create a new test class TestTargetFileSearch which creates a single repository and pefrorms multiple file searches in subtests. - group the common functionality in a base class TestDelegations. - extend the data classes to accomodate for target_files. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 189 +++++++++++++++++------- 1 file changed, 137 insertions(+), 52 deletions(-) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index 4a7d5c59..3a983bc1 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -3,15 +3,15 @@ # Copyright 2021, New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 -"""Test updating delegated targets roles with various -delegation hierarchies""" +"""Test updating delegated targets roles and searching for +target files with various delegation graphs""" import os import sys import tempfile import unittest from dataclasses import astuple, dataclass, field -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Tuple from tests import utils from tests.repository_simulator import RepositorySimulator @@ -35,18 +35,25 @@ class TestDelegation: path_hash_prefixes: Optional[List[str]] = None +@dataclass +class TestTarget: + rolename: str + content: bytes + targetpath: str + + @dataclass class DelegationsTestCase: - """Describes a delegations graph as a list of delegations - and the expected order of traversal as 'visited_order'.""" + """A delegations graph as lists of delegations and target files + and the expected order of traversal as a list of role names.""" delegations: List[TestDelegation] - visited_order: List[str] + target_files: List[TestTarget] = field(default_factory=list) + visited_order: List[str] = field(default_factory=list) -class TestDelegationsGraphs(unittest.TestCase): - """Test creating delegations graphs with different complexity - and successfully updating the delegated roles metadata""" +class TestDelegations(unittest.TestCase): + """Base class for delegation tests""" # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None @@ -59,70 +66,73 @@ def setUp(self) -> None: self.targets_dir = os.path.join(self.temp_dir.name, "targets") os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) + self.sim: RepositorySimulator def tearDown(self) -> None: self.temp_dir.cleanup() - def setup_subtest( - self, delegations: List[TestDelegation] - ) -> RepositorySimulator: - sim = self._init_repo(delegations) - + def setup_subtest(self) -> None: self.subtest_count += 1 if self.dump_dir is not None: # create subtest dumpdir name = f"{self.id().split('.')[-1]}-{self.subtest_count}" - sim.dump_dir = os.path.join(self.dump_dir, name) - os.mkdir(sim.dump_dir) + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) # dump the repo simulator metadata - sim.write() - - return sim + self.sim.write() def teardown_subtest(self) -> None: - # clean up after each subtest utils.cleanup_dir(self.metadata_dir) - def _init_updater(self, sim: RepositorySimulator) -> Updater: + def _init_repo(self, test_case: DelegationsTestCase) -> None: + """Create a new RepositorySimulator instance and + populate it with delegations and target files""" + + self.sim = RepositorySimulator() + spec_version = ".".join(SPECIFICATION_VERSION) + for d in test_case.delegations: + if d.rolename in self.sim.md_delegates: + targets = self.sim.md_delegates[d.rolename].signed + else: + targets = Targets( + 1, spec_version, self.sim.safe_expiry, {}, None + ) + # unpack 'd' but skip "delegator" + role = DelegatedRole(*astuple(d)[1:]) + self.sim.add_delegation(d.delegator, role, targets) + + for target in test_case.target_files: + self.sim.add_target(*astuple(target)) + + if test_case.target_files: + self.sim.targets.version += 1 + self.sim.update_snapshot() + + def _init_updater(self) -> Updater: """Create a new Updater instance""" + # Init trusted root for Updater + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + f.write(self.sim.signed_roots[0]) + return Updater( self.metadata_dir, "https://example.com/metadata/", self.targets_dir, "https://example.com/targets/", - sim, + self.sim, ) - def _init_repo( - self, delegations: List[TestDelegation] - ) -> RepositorySimulator: - """Create a new RepositorySimulator instance with 'delegations'""" - sim = RepositorySimulator() - spec_version = ".".join(SPECIFICATION_VERSION) - - for d in delegations: - if d.rolename in sim.md_delegates: - targets = sim.md_delegates[d.rolename].signed - else: - targets = Targets(1, spec_version, sim.safe_expiry, {}, None) - - # unpack 'd' but skip "delegator" - role = DelegatedRole(*astuple(d)[1:]) - sim.add_delegation(d.delegator, role, targets) - sim.update_snapshot() - - # Init trusted root for Updater - with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: - f.write(sim.signed_roots[0]) - - return sim - def _assert_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" expected_files = sorted([f"{role}.json" for role in roles]) local_metadata_files = sorted(os.listdir(self.metadata_dir)) self.assertListEqual(local_metadata_files, expected_files) + +class TestDelegationsGraphs(TestDelegations): + """Test creating delegations graphs with different complexity + and successfully updating the delegated roles metadata""" + graphs: utils.DataSet = { "basic delegation": DelegationsTestCase( delegations=[TestDelegation("targets", "A")], @@ -248,13 +258,15 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] exp_calls = [(role, 1) for role in test_data.visited_order] - sim = self.setup_subtest(test_data.delegations) - updater = self._init_updater(sim) + self._init_repo(test_data) + self.setup_subtest() + + updater = self._init_updater() # restrict the max number of delegations to simplify the test updater.config.max_delegations = 4 # Call explicitly refresh to simplify the expected_calls list updater.refresh() - sim.fetch_tracker.metadata.clear() + self.sim.fetch_tracker.metadata.clear() # Check that metadata dir contains only top-level roles self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) @@ -264,7 +276,80 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: self.assertIsNone(targetfile) # Check that the delegated roles were visited in the expected # order and the corresponding metadata files were persisted - self.assertListEqual(sim.fetch_tracker.metadata, exp_calls) + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) + finally: + self.teardown_subtest() + + +class TestTargetFileSearch(TestDelegations): + """ + Create a single repository with the following delegations: + + targets + *.doc, *md / \ release/*/* + A B + release/x/* / \ release/y/*.zip + C D + + Test that Updater successfully finds the target files metadata, + traversing the delegations as expected. + """ + + delegations_tree = DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A", paths=["*.doc", "*.md"]), + TestDelegation("targets", "B", paths=["releases/*/*"]), + TestDelegation("B", "C", paths=["releases/x/*"]), + TestDelegation("B", "D", paths=["releases/y/*.zip"]), + ], + target_files=[ + TestTarget("targets", b"targetfile content", "targetfile"), + TestTarget("A", b"README by A", "README.md"), + TestTarget("C", b"x release by C", "releases/x/x_v1"), + TestTarget("D", b"y release by D", "releases/y/y_v1.zip"), + TestTarget("D", b"z release by D", "releases/z/z_v1.zip"), + ], + ) + + def setUp(self) -> None: + super().setUp() + self._init_repo(self.delegations_tree) + + # fmt: off + targets: utils.DataSet = { + "target found, no delegations": ("targetfile", True, []), + "targetpath matches wildcard": ("README.md", True, ["A"]), + "targetpath with separators x": ("releases/x/x_v1", True, ["B", "C"]), + "targetpath with separators y": ("releases/y/y_v1.zip", True, ["B", "D"]), + "target exists, path is not delegated": ("releases/z/z_v1.zip", False, ["B"]), + } + # fmt: on + + @utils.run_sub_tests_with_dataset(targets) + def test_targetfile_search( + self, test_data: Tuple[str, bool, List[str]] + ) -> None: + try: + self.setup_subtest() + targetpath, found, visited_order = test_data + exp_files = [*TOP_LEVEL_ROLE_NAMES, *visited_order] + exp_calls = [(role, 1) for role in visited_order] + updater = self._init_updater() + # Call explicitly refresh to simplify the expected_calls list + updater.refresh() + self.sim.fetch_tracker.metadata.clear() + target = updater.get_targetinfo(targetpath) + if target is not None: + # Confirm that the expected TargetFile is found + self.assertTrue(found) + exp_target = self.sim.target_files[targetpath].target_file + self.assertDictEqual(target.to_dict(), exp_target.to_dict()) + else: + self.assertFalse(found) + # Check that the delegated roles were visited in the expected + # order and the corresponding metadata files were persisted + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) self._assert_files_exist(exp_files) finally: self.teardown_subtest() @@ -272,8 +357,8 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: if __name__ == "__main__": if "--dump" in sys.argv: - TestDelegationsGraphs.dump_dir = tempfile.mkdtemp() - print(f"Repository Simulator dumps in {TestDelegationsGraphs.dump_dir}") + TestDelegations.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestDelegations.dump_dir}") sys.argv.remove("--dump") utils.configure_test_logging(sys.argv)