python-tuf/tests/test_updater_fetch_target.py
Jussi Kukkonen 6450a3a8ff ngclient: Fail gracefully on missing role
If role is delegated but missing from snapshot, we currently raise a
undocumented KeyError: a generic RepositoryError seems better as callers
are expected to handle it (and adding a more specific error seems
useless as this is a repository software bug, not just expired metadata or
something).

The same check is also done later in TrustedMetadataSet but I think
keeping the check in both is clearest.

Fixes #2195

Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
2022-11-28 11:20:31 +02:00

234 lines
8.4 KiB
Python

#!/usr/bin/env python
# Copyright 2021, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Test 'Fetch target' from 'Detailed client workflow' as well as
target files storing/loading from cache.
"""
import os
import sys
import tempfile
import unittest
from dataclasses import dataclass
from typing import Optional
from tests import utils
from tests.repository_simulator import RepositorySimulator
from tuf.api.exceptions import RepositoryError
from tuf.api.metadata import DelegatedRole, Delegations
from tuf.ngclient import Updater
@dataclass
class TestTarget:
path: str
content: bytes
encoded_path: str
class TestFetchTarget(unittest.TestCase):
"""Test ngclient downloading and caching target files."""
# set dump_dir to trigger repository state dumps
dump_dir: Optional[str] = None
def setUp(self) -> None:
# pylint: disable-next=consider-using-with
self.temp_dir = tempfile.TemporaryDirectory()
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
os.mkdir(self.metadata_dir)
os.mkdir(self.targets_dir)
# Setup the repository, bootstrap client root.json
self.sim = RepositorySimulator()
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
f.write(self.sim.signed_roots[0])
if self.dump_dir is not None:
# create test specific dump directory
name = self.id().split(".")[-1]
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)
def tearDown(self) -> None:
self.temp_dir.cleanup()
def _init_updater(self) -> Updater:
"""Creates a new updater instance."""
if self.sim.dump_dir is not None:
self.sim.write()
updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
)
return updater
targets: utils.DataSet = {
"standard case": TestTarget(
path="targetpath",
content=b"target content",
encoded_path="targetpath",
),
"non-asci case": TestTarget(
path="åäö",
content=b"more content",
encoded_path="%C3%A5%C3%A4%C3%B6",
),
"subdirectory case": TestTarget(
path="a/b/c/targetpath",
content=b"dir target content",
encoded_path="a%2Fb%2Fc%2Ftargetpath",
),
}
@utils.run_sub_tests_with_dataset(targets)
def test_fetch_target(self, target: TestTarget) -> None:
path = os.path.join(self.targets_dir, target.encoded_path)
updater = self._init_updater()
# target does not exist yet
self.assertIsNone(updater.get_targetinfo(target.path))
# Add targets to repository
self.sim.targets.version += 1
self.sim.add_target("targets", target.content, target.path)
self.sim.update_snapshot()
updater = self._init_updater()
# target now exists, is not in cache yet
info = updater.get_targetinfo(target.path)
assert info is not None
# Test without and with explicit local filepath
self.assertIsNone(updater.find_cached_target(info))
self.assertIsNone(updater.find_cached_target(info, path))
# download target, assert it is in cache and content is correct
self.assertEqual(path, updater.download_target(info))
self.assertEqual(path, updater.find_cached_target(info))
self.assertEqual(path, updater.find_cached_target(info, path))
with open(path, "rb") as f:
self.assertEqual(f.read(), target.content)
# download using explicit filepath as well
os.remove(path)
self.assertEqual(path, updater.download_target(info, path))
self.assertEqual(path, updater.find_cached_target(info))
self.assertEqual(path, updater.find_cached_target(info, path))
def test_download_targets_with_succinct_roles(self) -> None:
self.sim.add_succinct_roles("targets", 8, "bin")
self.sim.update_snapshot()
assert self.sim.targets.delegations is not None
assert self.sim.targets.delegations.succinct_roles is not None
succinct_roles = self.sim.targets.delegations.succinct_roles
# Add lots of targets with unique data to imitate a real repository.
for i in range(20):
target_name = f"target-{i}"
target_bin = succinct_roles.get_role_for_target(target_name)
self.sim.add_target(
target_bin, bytes(target_name, "utf-8"), target_name
)
# download each target
updater = self._init_updater()
for i in range(20):
target_name = f"target-{i}"
# Verify that the target info was successfully found.
target_info = updater.get_targetinfo(target_name)
assert target_info is not None
target_full_path = updater.download_target(target_info)
# Verify that the target content is the same as the target name.
with open(target_full_path, encoding="utf-8") as target:
self.assertEqual(target.read(), target_name)
def test_invalid_target_download(self) -> None:
target = TestTarget("targetpath", b"content", "targetpath")
# Add target to repository
self.sim.targets.version += 1
self.sim.add_target("targets", target.content, target.path)
self.sim.update_snapshot()
updater = self._init_updater()
info = updater.get_targetinfo(target.path)
assert info is not None
# Corrupt the file content to not match the hash
self.sim.target_files[target.path].data = b"conten@"
with self.assertRaises(RepositoryError):
updater.download_target(info)
# Corrupt the file content to not match the length
self.sim.target_files[target.path].data = b"cont"
with self.assertRaises(RepositoryError):
updater.download_target(info)
# Verify the file is not persisted in cache
self.assertIsNone(updater.find_cached_target(info))
def test_invalid_target_cache(self) -> None:
target = TestTarget("targetpath", b"content", "targetpath")
# Add target to repository
self.sim.targets.version += 1
self.sim.add_target("targets", target.content, target.path)
self.sim.update_snapshot()
# Download the target
updater = self._init_updater()
info = updater.get_targetinfo(target.path)
assert info is not None
path = updater.download_target(info)
self.assertEqual(path, updater.find_cached_target(info))
# Add newer content to the same targetpath
target.content = b"contentv2"
self.sim.targets.version += 1
self.sim.add_target("targets", target.content, target.path)
self.sim.update_snapshot()
# Newer content is detected, old cached version is not used
updater = self._init_updater()
info = updater.get_targetinfo(target.path)
assert info is not None
self.assertIsNone(updater.find_cached_target(info))
# Download target, assert it is in cache and content is the newer
path = updater.download_target(info)
self.assertEqual(path, updater.find_cached_target(info))
with open(path, "rb") as f:
self.assertEqual(f.read(), target.content)
def test_meta_missing_delegated_role(self) -> None:
"""Test a delegation where the role is not part of the snapshot"""
# Add new delegation, update snapshot. Do not add the actual role
role = DelegatedRole("role1", [], 1, True, ["*"])
self.sim.targets.delegations = Delegations({}, roles={role.name: role})
self.sim.update_snapshot()
# assert that RepositoryError is raised when role1 is needed
updater = self._init_updater()
with self.assertRaises(RepositoryError):
updater.get_targetinfo("")
if __name__ == "__main__":
if "--dump" in sys.argv:
TestFetchTarget.dump_dir = tempfile.mkdtemp()
print(f"Repository Simulator dumps in {TestFetchTarget.dump_dir}")
sys.argv.remove("--dump")
utils.configure_test_logging(sys.argv)
unittest.main()