mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
599 lines
22 KiB
Python
599 lines
22 KiB
Python
# Copyright 2021, New York University and the TUF contributors
|
|
# SPDX-License-Identifier: MIT OR Apache-2.0
|
|
|
|
"""Test updating delegated targets roles and searching for
|
|
target files with various delegation graphs"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from dataclasses import astuple, dataclass, field
|
|
from typing import TYPE_CHECKING
|
|
|
|
from tests import utils
|
|
from tests.repository_simulator import RepositorySimulator
|
|
from tuf.api.exceptions import UnsignedMetadataError
|
|
from tuf.api.metadata import (
|
|
SPECIFICATION_VERSION,
|
|
TOP_LEVEL_ROLE_NAMES,
|
|
DelegatedRole,
|
|
Targets,
|
|
)
|
|
from tuf.ngclient import Updater
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Iterable
|
|
|
|
|
|
@dataclass
|
|
class TestDelegation:
|
|
delegator: str
|
|
rolename: str
|
|
keyids: list[str] = field(default_factory=list)
|
|
threshold: int = 1
|
|
terminating: bool = False
|
|
paths: list[str] | None = field(default_factory=lambda: ["*"])
|
|
path_hash_prefixes: list[str] | None = None
|
|
|
|
|
|
@dataclass
|
|
class TestTarget:
|
|
rolename: str
|
|
content: bytes
|
|
targetpath: str
|
|
|
|
|
|
@dataclass
|
|
class DelegationsTestCase:
|
|
"""A delegations graph as lists of delegations and target files
|
|
and the expected order of traversal as a list of role names."""
|
|
|
|
delegations: list[TestDelegation]
|
|
target_files: list[TestTarget] = field(default_factory=list)
|
|
visited_order: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class TargetTestCase:
|
|
targetpath: str
|
|
found: bool
|
|
visited_order: list[str] = field(default_factory=list)
|
|
|
|
|
|
class TestDelegations(unittest.TestCase):
|
|
"""Base class for delegation tests"""
|
|
|
|
# set dump_dir to trigger repository state dumps
|
|
dump_dir: str | None = None
|
|
|
|
def setUp(self) -> None:
|
|
self.subtest_count = 0
|
|
self.temp_dir = tempfile.TemporaryDirectory()
|
|
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
|
|
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
|
|
os.mkdir(self.metadata_dir)
|
|
os.mkdir(self.targets_dir)
|
|
self.sim: RepositorySimulator
|
|
|
|
def tearDown(self) -> None:
|
|
self.temp_dir.cleanup()
|
|
|
|
def setup_subtest(self) -> None:
|
|
self.subtest_count += 1
|
|
if self.dump_dir is not None:
|
|
# create subtest dumpdir
|
|
name = f"{self.id().split('.')[-1]}-{self.subtest_count}"
|
|
self.sim.dump_dir = os.path.join(self.dump_dir, name)
|
|
os.mkdir(self.sim.dump_dir)
|
|
# dump the repo simulator metadata
|
|
self.sim.write()
|
|
|
|
def teardown_subtest(self) -> None:
|
|
utils.cleanup_metadata_dir(self.metadata_dir)
|
|
|
|
def _init_repo(self, test_case: DelegationsTestCase) -> None:
|
|
"""Create a new RepositorySimulator instance and
|
|
populate it with delegations and target files"""
|
|
|
|
self.sim = RepositorySimulator()
|
|
spec_version = ".".join(SPECIFICATION_VERSION)
|
|
for d in test_case.delegations:
|
|
if d.rolename in self.sim.md_delegates:
|
|
targets = self.sim.md_delegates[d.rolename].signed
|
|
else:
|
|
targets = Targets(
|
|
1, spec_version, self.sim.safe_expiry, {}, None
|
|
)
|
|
# unpack 'd' but skip "delegator"
|
|
role = DelegatedRole(*astuple(d)[1:])
|
|
self.sim.add_delegation(d.delegator, role, targets)
|
|
|
|
for target in test_case.target_files:
|
|
self.sim.add_target(*astuple(target))
|
|
|
|
if test_case.target_files:
|
|
self.sim.targets.version += 1
|
|
self.sim.update_snapshot()
|
|
|
|
def _init_updater(self) -> Updater:
|
|
"""Create a new Updater instance"""
|
|
return Updater(
|
|
self.metadata_dir,
|
|
"https://example.com/metadata/",
|
|
self.targets_dir,
|
|
"https://example.com/targets/",
|
|
self.sim,
|
|
bootstrap=self.sim.signed_roots[0],
|
|
)
|
|
|
|
def _assert_files_exist(self, roles: Iterable[str]) -> None:
|
|
"""Assert that local metadata files match 'roles'"""
|
|
expected_files = [f"{role}.json" for role in roles]
|
|
found_files = [
|
|
e.name for e in os.scandir(self.metadata_dir) if e.is_file()
|
|
]
|
|
|
|
self.assertListEqual(sorted(found_files), sorted(expected_files))
|
|
|
|
|
|
class TestDelegationsGraphs(TestDelegations):
|
|
"""Test creating delegations graphs with different complexity
|
|
and successfully updating the delegated roles metadata"""
|
|
|
|
graphs = {
|
|
"basic delegation": DelegationsTestCase(
|
|
delegations=[TestDelegation("targets", "A")],
|
|
visited_order=["A"],
|
|
),
|
|
"single level delegations": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
],
|
|
visited_order=["A", "B"],
|
|
),
|
|
"two-level delegations": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("B", "C"),
|
|
],
|
|
visited_order=["A", "B", "C"],
|
|
),
|
|
"two-level test DFS order of traversal": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("A", "C"),
|
|
TestDelegation("A", "D"),
|
|
],
|
|
visited_order=["A", "C", "D", "B"],
|
|
),
|
|
"three-level delegation test DFS order of traversal": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("A", "C"),
|
|
TestDelegation("C", "D"),
|
|
],
|
|
visited_order=["A", "C", "D", "B"],
|
|
),
|
|
"two-level terminating ignores all but role's descendants": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("A", "C", terminating=True),
|
|
TestDelegation("A", "D"),
|
|
],
|
|
visited_order=["A", "C"],
|
|
),
|
|
"three-level terminating ignores all but role's descendants": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("A", "C", terminating=True),
|
|
TestDelegation("C", "D"),
|
|
],
|
|
visited_order=["A", "C", "D"],
|
|
),
|
|
"two-level ignores all branches not matching 'paths'": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A", paths=["*.py"]),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("A", "C"),
|
|
],
|
|
visited_order=["B"],
|
|
),
|
|
"three-level ignores all branches not matching 'paths'": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("A", "C", paths=["*.py"]),
|
|
TestDelegation("C", "D"),
|
|
],
|
|
visited_order=["A", "B"],
|
|
),
|
|
"cyclic graph": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("B", "C"),
|
|
TestDelegation("C", "D"),
|
|
TestDelegation("D", "B"),
|
|
],
|
|
visited_order=["A", "B", "C", "D"],
|
|
),
|
|
"two roles delegating to a third": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("B", "C"),
|
|
TestDelegation("A", "C"),
|
|
],
|
|
# Under all same conditions, 'C' is reached through 'A' first"
|
|
visited_order=["A", "C", "B"],
|
|
),
|
|
"two roles delegating to a third different 'paths'": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("B", "C"),
|
|
TestDelegation("A", "C", paths=["*.py"]),
|
|
],
|
|
# 'C' is reached through 'B' since 'A' does not delegate a matching pattern"
|
|
visited_order=["A", "B", "C"],
|
|
),
|
|
"max number of delegations": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("targets", "C"),
|
|
TestDelegation("C", "D"),
|
|
TestDelegation("C", "E"),
|
|
],
|
|
# "E" is skipped, max_delegations is 4
|
|
visited_order=["A", "B", "C", "D"],
|
|
),
|
|
}
|
|
|
|
@utils.run_sub_tests_with_dataset(graphs)
|
|
def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
|
|
"""Test that delegated roles are traversed in the order of appearance
|
|
in the delegator's metadata, using pre-order depth-first search"""
|
|
|
|
try:
|
|
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order]
|
|
exp_calls = [(role, 1) for role in test_data.visited_order]
|
|
|
|
self._init_repo(test_data)
|
|
self.setup_subtest()
|
|
|
|
updater = self._init_updater()
|
|
# restrict the max number of delegations to simplify the test
|
|
updater.config.max_delegations = 4
|
|
# Call explicitly refresh to simplify the expected_calls list
|
|
updater.refresh()
|
|
self.sim.fetch_tracker.metadata.clear()
|
|
# Check that metadata dir contains only top-level roles
|
|
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
|
|
|
|
# Looking for a non-existing targetpath forces updater
|
|
# to visit all possible delegated roles
|
|
targetfile = updater.get_targetinfo("missingpath")
|
|
self.assertIsNone(targetfile)
|
|
# Check that the delegated roles were visited in the expected
|
|
# order and the corresponding metadata files were persisted
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
|
self._assert_files_exist(exp_files)
|
|
finally:
|
|
self.teardown_subtest()
|
|
|
|
invalid_metadata = {
|
|
"unsigned delegated role": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "invalid"),
|
|
TestDelegation("targets", "B"),
|
|
TestDelegation("invalid", "C"),
|
|
],
|
|
# The traversal stops after visiting an invalid role
|
|
visited_order=["invalid"],
|
|
)
|
|
}
|
|
|
|
@utils.run_sub_tests_with_dataset(invalid_metadata)
|
|
def test_invalid_metadata(self, test_data: DelegationsTestCase) -> None:
|
|
try:
|
|
self._init_repo(test_data)
|
|
# The invalid role is the last visited
|
|
invalid_role = test_data.visited_order[-1]
|
|
self.sim.signers[invalid_role].clear()
|
|
|
|
self.setup_subtest()
|
|
# The invalid role metadata must not be persisted
|
|
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order[:-1]]
|
|
exp_calls = [(role, 1) for role in test_data.visited_order]
|
|
|
|
updater = self._init_updater()
|
|
# Call explicitly refresh to simplify the expected_calls list
|
|
updater.refresh()
|
|
self.sim.fetch_tracker.metadata.clear()
|
|
|
|
with self.assertRaises(UnsignedMetadataError):
|
|
updater.get_targetinfo("missingpath")
|
|
# Check that there were no visited roles after the invalid one
|
|
# and only the valid metadata files were persisted
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
|
self._assert_files_exist(exp_files)
|
|
finally:
|
|
self.teardown_subtest()
|
|
|
|
def test_safely_encoded_rolenames(self) -> None:
|
|
"""Test that delegated roles names are safely encoded in the filenames
|
|
and URLs.
|
|
"""
|
|
|
|
roles_to_filenames = {
|
|
"../a": "..%2Fa.json",
|
|
".": "..json",
|
|
"/": "%2F.json",
|
|
"ö": "%C3%B6.json",
|
|
}
|
|
|
|
delegations = []
|
|
for rolename in roles_to_filenames:
|
|
delegations.append(TestDelegation("targets", rolename))
|
|
|
|
delegated_rolenames = DelegationsTestCase(delegations)
|
|
self._init_repo(delegated_rolenames)
|
|
updater = self._init_updater()
|
|
updater.refresh()
|
|
|
|
# trigger updater to fetch the delegated metadata
|
|
self.sim.fetch_tracker.metadata.clear()
|
|
updater.get_targetinfo("anything")
|
|
|
|
# assert that local delegated metadata filenames are expected
|
|
local_metadata = os.listdir(self.metadata_dir)
|
|
for fname in roles_to_filenames.values():
|
|
self.assertTrue(fname in local_metadata)
|
|
|
|
# assert that requested URLs are quoted without extension
|
|
exp_calls = [(quoted[:-5], 1) for quoted in roles_to_filenames.values()]
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
|
|
|
hash_bins_graph = {
|
|
"delegations": DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation(
|
|
"targets",
|
|
"role1",
|
|
paths=None,
|
|
path_hash_prefixes=["8", "9", "a", "b"],
|
|
),
|
|
TestDelegation(
|
|
"targets",
|
|
"role2",
|
|
paths=None,
|
|
path_hash_prefixes=["0", "1", "2", "3"],
|
|
),
|
|
TestDelegation(
|
|
"targets",
|
|
"role3",
|
|
paths=None,
|
|
path_hash_prefixes=["c", "d", "e", "f"],
|
|
),
|
|
],
|
|
visited_order=["role1", "role2", "role3"],
|
|
),
|
|
}
|
|
|
|
@utils.run_sub_tests_with_dataset(hash_bins_graph)
|
|
def test_hash_bins_graph_traversal(
|
|
self, test_data: DelegationsTestCase
|
|
) -> None:
|
|
"""Test that delegated roles are traversed in the order of appearance
|
|
in the delegator's metadata, using pre-order depth-first search and that
|
|
they correctly refer to the corresponding hash bin prefixes"""
|
|
|
|
try:
|
|
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order]
|
|
exp_calls = [(role, 1) for role in test_data.visited_order]
|
|
|
|
self._init_repo(test_data)
|
|
self.setup_subtest()
|
|
|
|
updater = self._init_updater()
|
|
# Call explicitly refresh to simplify the expected_calls list
|
|
updater.refresh()
|
|
self.sim.fetch_tracker.metadata.clear()
|
|
# Check that metadata dir contains only top-level roles
|
|
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
|
|
|
|
# Looking for a non-existing targetpath forces updater
|
|
# to visit a correspondning delegated role
|
|
targetfile = updater.get_targetinfo("missingpath")
|
|
self.assertIsNone(targetfile)
|
|
targetfile = updater.get_targetinfo("othermissingpath")
|
|
self.assertIsNone(targetfile)
|
|
targetfile = updater.get_targetinfo("thirdmissingpath")
|
|
self.assertIsNone(targetfile)
|
|
# Check that the delegated roles were visited in the expected
|
|
# order and the corresponding metadata files were persisted
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
|
self._assert_files_exist(exp_files)
|
|
finally:
|
|
self.teardown_subtest()
|
|
|
|
@dataclass
|
|
class SuccinctRolesTestCase:
|
|
bit_length: int
|
|
target_path: str
|
|
expected_target_bin: str
|
|
|
|
# By setting the bit_length the total number of bins is 2^bit_length.
|
|
# In each test case target_path is a path to a random target we want to
|
|
# fetch and expected_target_bin is the bin we are expecting to visit.
|
|
succinct_bins_graph = {
|
|
"bin amount = 2, target bin index 0": SuccinctRolesTestCase(
|
|
bit_length=1,
|
|
target_path="boo",
|
|
expected_target_bin="bin-0",
|
|
),
|
|
"bin amount = 2, target bin index 1": SuccinctRolesTestCase(
|
|
bit_length=1,
|
|
target_path="too",
|
|
expected_target_bin="bin-1",
|
|
),
|
|
"bin amount = 4, target bin index 0": SuccinctRolesTestCase(
|
|
bit_length=2,
|
|
target_path="foo",
|
|
expected_target_bin="bin-0",
|
|
),
|
|
"bin amount = 4, target bin index 1": SuccinctRolesTestCase(
|
|
bit_length=2,
|
|
target_path="doo",
|
|
expected_target_bin="bin-1",
|
|
),
|
|
"bin amount = 4, target bin index 2": SuccinctRolesTestCase(
|
|
bit_length=2,
|
|
target_path="too",
|
|
expected_target_bin="bin-2",
|
|
),
|
|
"bin amount = 4, target bin index 3": SuccinctRolesTestCase(
|
|
bit_length=2,
|
|
target_path="bar",
|
|
expected_target_bin="bin-3",
|
|
),
|
|
"bin amount = 256, target bin index fc": SuccinctRolesTestCase(
|
|
bit_length=8,
|
|
target_path="bar",
|
|
expected_target_bin="bin-fc",
|
|
),
|
|
}
|
|
|
|
@utils.run_sub_tests_with_dataset(succinct_bins_graph)
|
|
def test_succinct_roles_graph_traversal(
|
|
self, test_data: SuccinctRolesTestCase
|
|
) -> None:
|
|
# Test traversing the delegation tree when succinct roles is used. For a
|
|
# successful traversal all top level metadata files plus the expected
|
|
# bin should exist locally and only one bin must be downloaded.
|
|
|
|
try:
|
|
exp_files = [*TOP_LEVEL_ROLE_NAMES, test_data.expected_target_bin]
|
|
exp_calls = [(test_data.expected_target_bin, 1)]
|
|
|
|
self.sim = RepositorySimulator()
|
|
self.sim.add_succinct_roles("targets", test_data.bit_length, "bin")
|
|
self.sim.update_snapshot()
|
|
|
|
self.setup_subtest()
|
|
|
|
updater = self._init_updater()
|
|
# Call explicitly refresh to simplify the expected_calls list.
|
|
updater.refresh()
|
|
self.sim.fetch_tracker.metadata.clear()
|
|
# Check that metadata dir contains only top-level roles
|
|
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
|
|
|
|
# Looking for a non-existing targetpath forces updater
|
|
# to visit a corresponding delegated role.
|
|
targetfile = updater.get_targetinfo(test_data.target_path)
|
|
self.assertIsNone(targetfile)
|
|
|
|
# Check that the delegated roles were visited in the expected
|
|
# order and the corresponding metadata files were persisted.
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
|
self._assert_files_exist(exp_files)
|
|
|
|
finally:
|
|
self.teardown_subtest()
|
|
|
|
|
|
class TestTargetFileSearch(TestDelegations):
|
|
r"""
|
|
Create a single repository with the following delegations:
|
|
|
|
targets
|
|
*.doc, *md / \ release/*/*
|
|
A B
|
|
release/x/* / \ release/y/*.zip
|
|
C D
|
|
|
|
Test that Updater successfully finds the target files metadata,
|
|
traversing the delegations as expected.
|
|
"""
|
|
|
|
delegations_tree = DelegationsTestCase(
|
|
delegations=[
|
|
TestDelegation("targets", "A", paths=["*.doc", "*.md"]),
|
|
TestDelegation("targets", "B", paths=["releases/*/*"]),
|
|
TestDelegation("B", "C", paths=["releases/x/*"]),
|
|
TestDelegation("B", "D", paths=["releases/y/*.zip"]),
|
|
],
|
|
target_files=[
|
|
TestTarget("targets", b"targetfile content", "targetfile"),
|
|
TestTarget("A", b"README by A", "README.md"),
|
|
TestTarget("C", b"x release by C", "releases/x/x_v1"),
|
|
TestTarget("D", b"y release by D", "releases/y/y_v1.zip"),
|
|
TestTarget("D", b"z release by D", "releases/z/z_v1.zip"),
|
|
],
|
|
)
|
|
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
self._init_repo(self.delegations_tree)
|
|
|
|
# fmt: off
|
|
targets = {
|
|
"no delegations":
|
|
TargetTestCase("targetfile", True, []),
|
|
"targetpath matches wildcard":
|
|
TargetTestCase("README.md", True, ["A"]),
|
|
"targetpath with separators x":
|
|
TargetTestCase("releases/x/x_v1", True, ["B", "C"]),
|
|
"targetpath with separators y":
|
|
TargetTestCase("releases/y/y_v1.zip", True, ["B", "D"]),
|
|
"targetpath is not delegated by all roles in the chain":
|
|
TargetTestCase("releases/z/z_v1.zip", False, ["B"]),
|
|
}
|
|
# fmt: on
|
|
|
|
@utils.run_sub_tests_with_dataset(targets)
|
|
def test_targetfile_search(self, test_data: TargetTestCase) -> None:
|
|
try:
|
|
self.setup_subtest()
|
|
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order]
|
|
exp_calls = [(role, 1) for role in test_data.visited_order]
|
|
exp_target = self.sim.target_files[test_data.targetpath].target_file
|
|
|
|
updater = self._init_updater()
|
|
# Call explicitly refresh to simplify the expected_calls list
|
|
updater.refresh()
|
|
self.sim.fetch_tracker.metadata.clear()
|
|
target = updater.get_targetinfo(test_data.targetpath)
|
|
if target is not None:
|
|
# Confirm that the expected TargetFile is found
|
|
self.assertTrue(test_data.found)
|
|
self.assertDictEqual(target.to_dict(), exp_target.to_dict())
|
|
else:
|
|
self.assertFalse(test_data.found)
|
|
# Check that the delegated roles were visited in the expected
|
|
# order and the corresponding metadata files were persisted
|
|
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
|
|
self._assert_files_exist(exp_files)
|
|
finally:
|
|
self.teardown_subtest()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if "--dump" in sys.argv:
|
|
TestDelegations.dump_dir = tempfile.mkdtemp()
|
|
print(f"Repository Simulator dumps in {TestDelegations.dump_dir}")
|
|
sys.argv.remove("--dump")
|
|
|
|
utils.configure_test_logging(sys.argv)
|
|
unittest.main()
|