mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #1666 from sechkova/reposim-consistent-snapshot
RepositorySimulator: add non-consistent snapshot support
This commit is contained in:
commit
1b5df4ccfe
3 changed files with 259 additions and 76 deletions
|
|
@ -111,9 +111,12 @@ def __init__(self):
|
|||
# target downloads are served from this dict
|
||||
self.target_files: Dict[str, RepositoryTarget] = {}
|
||||
|
||||
# Whether to compute hashes and legth for meta in snapshot/timestamp
|
||||
# Whether to compute hashes and length for meta in snapshot/timestamp
|
||||
self.compute_metafile_hashes_length = False
|
||||
|
||||
# Enable hash-prefixed target file names
|
||||
self.prefix_targets_with_hash = True
|
||||
|
||||
self.dump_dir = None
|
||||
self.dump_version = 0
|
||||
|
||||
|
|
@ -192,24 +195,32 @@ def fetch(self, url: str) -> Iterator[bytes]:
|
|||
"""Fetches data from the given url and returns an Iterator (or yields
|
||||
bytes).
|
||||
"""
|
||||
if not self.root.consistent_snapshot:
|
||||
raise NotImplementedError("non-consistent snapshot not supported")
|
||||
path = parse.urlparse(url).path
|
||||
if path.startswith("/metadata/") and path.endswith(".json"):
|
||||
# figure out rolename and version
|
||||
ver_and_name = path[len("/metadata/") :][: -len(".json")]
|
||||
# only consistent_snapshot supported ATM: timestamp is special case
|
||||
if ver_and_name == "timestamp":
|
||||
version = None
|
||||
role = "timestamp"
|
||||
else:
|
||||
version, _, role = ver_and_name.partition(".")
|
||||
version, _, role = ver_and_name.partition(".")
|
||||
# root is always version-prefixed while timestamp is always NOT
|
||||
if role == "root" or (
|
||||
self.root.consistent_snapshot and ver_and_name != "timestamp"
|
||||
):
|
||||
version = int(version)
|
||||
else:
|
||||
# the file is not version-prefixed
|
||||
role = ver_and_name
|
||||
version = None
|
||||
|
||||
yield self._fetch_metadata(role, version)
|
||||
elif path.startswith("/targets/"):
|
||||
# figure out target path and hash prefix
|
||||
target_path = path[len("/targets/") :]
|
||||
dir_parts, sep, prefixed_filename = target_path.rpartition("/")
|
||||
prefix, _, filename = prefixed_filename.partition(".")
|
||||
# extract the hash prefix, if any
|
||||
if self.root.consistent_snapshot and self.prefix_targets_with_hash:
|
||||
prefix, _, filename = prefixed_filename.partition(".")
|
||||
else:
|
||||
filename = prefixed_filename
|
||||
prefix = None
|
||||
target_path = f"{dir_parts}{sep}{filename}"
|
||||
|
||||
yield self._fetch_target(target_path, prefix)
|
||||
|
|
@ -257,7 +268,7 @@ def _fetch_metadata(
|
|||
elif role == "targets":
|
||||
md = self.md_targets
|
||||
else:
|
||||
md = self.md_delegates[role]
|
||||
md = self.md_delegates.get(role)
|
||||
|
||||
if md is None:
|
||||
raise FetcherHTTPError(f"Unknown role {role}", 404)
|
||||
|
|
|
|||
237
tests/test_updater_consistent_snapshot.py
Normal file
237
tests/test_updater_consistent_snapshot.py
Normal file
|
|
@ -0,0 +1,237 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2021, New York University and the TUF contributors
|
||||
# SPDX-License-Identifier: MIT OR Apache-2.0
|
||||
|
||||
"""Test ngclient Updater toggling consistent snapshot"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
from unittest.mock import call, patch
|
||||
|
||||
from tests import utils
|
||||
from tests.repository_simulator import RepositorySimulator
|
||||
from tuf.api.metadata import (
|
||||
SPECIFICATION_VERSION,
|
||||
TOP_LEVEL_ROLE_NAMES,
|
||||
Targets,
|
||||
)
|
||||
from tuf.ngclient import Updater
|
||||
|
||||
|
||||
class TestConsistentSnapshot(unittest.TestCase):
|
||||
"""Test different combinations of 'consistent_snapshot' and
|
||||
'prefix_targets_with_hash' and verify that the correct URLs
|
||||
are formed for each combination"""
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
|
||||
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
|
||||
os.mkdir(self.metadata_dir)
|
||||
os.mkdir(self.targets_dir)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _init_repo(
|
||||
self, consistent_snapshot: bool, prefix_targets: bool = True
|
||||
) -> RepositorySimulator:
|
||||
"""Create a new RepositorySimulator instance"""
|
||||
sim = RepositorySimulator()
|
||||
sim.root.consistent_snapshot = consistent_snapshot
|
||||
sim.root.version += 1
|
||||
sim.publish_root()
|
||||
sim.prefix_targets_with_hash = prefix_targets
|
||||
|
||||
# Init trusted root with the latest consistent_snapshot
|
||||
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
|
||||
root = sim.download_bytes(
|
||||
"https://example.com/metadata/2.root.json", 100000
|
||||
)
|
||||
f.write(root)
|
||||
|
||||
return sim
|
||||
|
||||
def _init_updater(self, sim: RepositorySimulator) -> Updater:
|
||||
"""Create a new Updater instance"""
|
||||
return Updater(
|
||||
self.metadata_dir,
|
||||
"https://example.com/metadata/",
|
||||
self.targets_dir,
|
||||
"https://example.com/targets/",
|
||||
sim,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _cleanup_dir(path: str) -> None:
|
||||
"""Delete all files inside a directory"""
|
||||
for filepath in [
|
||||
os.path.join(path, filename) for filename in os.listdir(path)
|
||||
]:
|
||||
os.remove(filepath)
|
||||
|
||||
def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None:
|
||||
"""Assert that local metadata files exist for 'roles'"""
|
||||
local_metadata_files = os.listdir(self.metadata_dir)
|
||||
for role in roles:
|
||||
self.assertIn(f"{role}.json", local_metadata_files)
|
||||
|
||||
def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None:
|
||||
"""Assert that local files with 'filenames' exist"""
|
||||
local_target_files = os.listdir(self.targets_dir)
|
||||
for filename in filenames:
|
||||
self.assertIn(filename, local_target_files)
|
||||
|
||||
top_level_roles_data: utils.DataSet = {
|
||||
"consistent_snaphot disabled": {
|
||||
"consistent_snapshot": False,
|
||||
"calls": [
|
||||
call("root", 3),
|
||||
call("timestamp", None),
|
||||
call("snapshot", None),
|
||||
call("targets", None),
|
||||
],
|
||||
},
|
||||
"consistent_snaphot enabled": {
|
||||
"consistent_snapshot": True,
|
||||
"calls": [
|
||||
call("root", 3),
|
||||
call("timestamp", None),
|
||||
call("snapshot", 1),
|
||||
call("targets", 1),
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
@utils.run_sub_tests_with_dataset(top_level_roles_data)
|
||||
def test_top_level_roles_update(self, test_case_data: Dict[str, Any]):
|
||||
# Test if the client fetches and stores metadata files with the
|
||||
# correct version prefix, depending on 'consistent_snapshot' config
|
||||
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
||||
expected_calls: List[Any] = test_case_data["calls"]
|
||||
|
||||
sim = self._init_repo(consistent_snapshot)
|
||||
updater = self._init_updater(sim)
|
||||
|
||||
with patch.object(
|
||||
sim, "_fetch_metadata", wraps=sim._fetch_metadata
|
||||
) as wrapped_fetch:
|
||||
updater.refresh()
|
||||
|
||||
# metadata files are fetched with the expected version (or None)
|
||||
self.assertListEqual(wrapped_fetch.call_args_list, expected_calls)
|
||||
# metadata files are always persisted without a version prefix
|
||||
self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES)
|
||||
|
||||
self._cleanup_dir(self.metadata_dir)
|
||||
|
||||
delegated_roles_data: utils.DataSet = {
|
||||
"consistent_snaphot disabled": {
|
||||
"consistent_snapshot": False,
|
||||
"expected_version": None,
|
||||
},
|
||||
"consistent_snaphot enabled": {
|
||||
"consistent_snapshot": True,
|
||||
"expected_version": 1,
|
||||
},
|
||||
}
|
||||
|
||||
@utils.run_sub_tests_with_dataset(delegated_roles_data)
|
||||
def test_delegated_roles_update(self, test_case_data: Dict[str, Any]):
|
||||
# Test if the client fetches and stores delegated metadata files with
|
||||
# the correct version prefix, depending on 'consistent_snapshot' config
|
||||
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
||||
expected_version: Optional[int] = test_case_data["expected_version"]
|
||||
rolenames = ["role1", "..", "."]
|
||||
expected_calls = [call(role, expected_version) for role in rolenames]
|
||||
|
||||
sim = self._init_repo(consistent_snapshot)
|
||||
# Add new delegated targets
|
||||
spec_version = ".".join(SPECIFICATION_VERSION)
|
||||
targets = Targets(1, spec_version, sim.safe_expiry, {}, None)
|
||||
for role in rolenames:
|
||||
sim.add_delegation("targets", role, targets, False, ["*"], None)
|
||||
sim.update_snapshot()
|
||||
updater = self._init_updater(sim)
|
||||
updater.refresh()
|
||||
|
||||
with patch.object(
|
||||
sim, "_fetch_metadata", wraps=sim._fetch_metadata
|
||||
) as wrapped_fetch:
|
||||
# trigger updater to fetch the delegated metadata
|
||||
updater.get_targetinfo("anything")
|
||||
# metadata files are fetched with the expected version (or None)
|
||||
self.assertListEqual(wrapped_fetch.call_args_list, expected_calls)
|
||||
# metadata files are always persisted without a version prefix
|
||||
self._assert_metadata_files_exist(rolenames)
|
||||
|
||||
self._cleanup_dir(self.metadata_dir)
|
||||
|
||||
targets_download_data: utils.DataSet = {
|
||||
"consistent_snaphot disabled": {
|
||||
"consistent_snapshot": False,
|
||||
"prefix_targets": True,
|
||||
"hash_algo": None,
|
||||
},
|
||||
"consistent_snaphot enabled without prefixed targets": {
|
||||
"consistent_snapshot": True,
|
||||
"prefix_targets": False,
|
||||
"hash_algo": None,
|
||||
},
|
||||
"consistent_snaphot enabled with prefixed targets": {
|
||||
"consistent_snapshot": True,
|
||||
"prefix_targets": True,
|
||||
"hash_algo": "sha256",
|
||||
},
|
||||
}
|
||||
|
||||
@utils.run_sub_tests_with_dataset(targets_download_data)
|
||||
def test_download_targets(self, test_case_data: Dict[str, Any]):
|
||||
# Test if the client fetches and stores target files with
|
||||
# the correct hash prefix, depending on 'consistent_snapshot'
|
||||
# and 'prefix_targets_with_hash' config
|
||||
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
||||
prefix_targets_with_hash: bool = test_case_data["prefix_targets"]
|
||||
hash_algo: Optional[str] = test_case_data["hash_algo"]
|
||||
targetpaths = ["file", "file.txt", "..file.ext", "f.le"]
|
||||
|
||||
sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash)
|
||||
# Add targets to repository
|
||||
for targetpath in targetpaths:
|
||||
sim.targets.version += 1
|
||||
sim.add_target("targets", b"content", targetpath)
|
||||
sim.update_snapshot()
|
||||
|
||||
updater = self._init_updater(sim)
|
||||
updater.config.prefix_targets_with_hash = prefix_targets_with_hash
|
||||
updater.refresh()
|
||||
|
||||
with patch.object(
|
||||
sim, "_fetch_target", wraps=sim._fetch_target
|
||||
) as wrapped_fetch_target:
|
||||
|
||||
for targetpath in targetpaths:
|
||||
info = updater.get_targetinfo(targetpath)
|
||||
updater.download_target(info)
|
||||
expected_prefix = (
|
||||
None if not hash_algo else info.hashes[hash_algo]
|
||||
)
|
||||
# files are fetched with the expected hash prefix (or None)
|
||||
wrapped_fetch_target.assert_called_once_with(
|
||||
info.path, expected_prefix
|
||||
)
|
||||
# target files are always persisted without hash prefix
|
||||
self._assert_targets_files_exist([info.path])
|
||||
wrapped_fetch_target.reset_mock()
|
||||
|
||||
self._cleanup_dir(self.targets_dir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
utils.configure_test_logging(sys.argv)
|
||||
unittest.main()
|
||||
|
|
@ -131,26 +131,6 @@ def tearDown(self):
|
|||
# Logs stdout and stderr from the sever subprocess.
|
||||
self.server_process_handler.flush_log()
|
||||
|
||||
def _create_consistent_target(
|
||||
self, targetname: str, target_hash: str
|
||||
) -> None:
|
||||
"""Create consistent targets copies of their non-consistent counterparts
|
||||
inside the repository directory.
|
||||
|
||||
Args:
|
||||
targetname: A string denoting the name of the target file.
|
||||
target_hash: A string denoting the hash of the target.
|
||||
|
||||
"""
|
||||
consistent_target_name = f"{target_hash}.{targetname}"
|
||||
source_path = os.path.join(
|
||||
self.repository_directory, "targets", targetname
|
||||
)
|
||||
destination_path = os.path.join(
|
||||
self.repository_directory, "targets", consistent_target_name
|
||||
)
|
||||
shutil.copy(source_path, destination_path)
|
||||
|
||||
def _modify_repository_root(
|
||||
self, modification_func, bump_version=False
|
||||
) -> None:
|
||||
|
|
@ -185,51 +165,6 @@ def _assert_files(self, roles: List[str]):
|
|||
client_files = sorted(os.listdir(self.client_directory))
|
||||
self.assertEqual(client_files, expected_files)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
def test_refresh_on_consistent_targets(self):
|
||||
# Generate a new root version where consistent_snapshot is set to true
|
||||
def consistent_snapshot_modifier(root):
|
||||
root.signed.consistent_snapshot = True
|
||||
|
||||
self._modify_repository_root(
|
||||
consistent_snapshot_modifier, bump_version=True
|
||||
)
|
||||
updater = ngclient.Updater(
|
||||
self.client_directory,
|
||||
self.metadata_url,
|
||||
self.dl_dir,
|
||||
self.targets_url,
|
||||
)
|
||||
|
||||
# All metadata is in local directory already
|
||||
updater.refresh()
|
||||
# Make sure that consistent snapshot is enabled
|
||||
self.assertTrue(updater._trusted_set.root.signed.consistent_snapshot)
|
||||
|
||||
# Get targetinfos, assert cache does not contain the files
|
||||
info1 = updater.get_targetinfo("file1.txt")
|
||||
info3 = updater.get_targetinfo("file3.txt")
|
||||
self.assertIsNone(updater.find_cached_target(info1))
|
||||
self.assertIsNone(updater.find_cached_target(info3))
|
||||
|
||||
# Create consistent targets with file path HASH.FILENAME.EXT
|
||||
target1_hash = list(info1.hashes.values())[0]
|
||||
target3_hash = list(info3.hashes.values())[0]
|
||||
self._create_consistent_target("file1.txt", target1_hash)
|
||||
self._create_consistent_target("file3.txt", target3_hash)
|
||||
|
||||
# Download files, assert that cache has correct files
|
||||
updater.download_target(info1)
|
||||
path = updater.find_cached_target(info1)
|
||||
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
|
||||
self.assertIsNone(updater.find_cached_target(info3))
|
||||
|
||||
updater.download_target(info3)
|
||||
path = updater.find_cached_target(info1)
|
||||
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
|
||||
path = updater.find_cached_target(info3)
|
||||
self.assertEqual(path, os.path.join(self.dl_dir, info3.path))
|
||||
|
||||
def test_refresh_and_download(self):
|
||||
# Test refresh without consistent targets - targets without hash prefix.
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue