From 4e5980e89dd179c9eb98747312ffba57da2db947 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 2 Sep 2021 18:44:28 +0300 Subject: [PATCH] tests: Start testing ngclient with repo simulator Signed-off-by: Jussi Kukkonen --- tests/repository_simulator.py | 163 +++++++++++++++++++++++++++ tests/test_updater_with_simulator.py | 68 +++++++++++ 2 files changed, 231 insertions(+) create mode 100644 tests/repository_simulator.py create mode 100644 tests/test_updater_with_simulator.py diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py new file mode 100644 index 00000000..24f8ea3a --- /dev/null +++ b/tests/repository_simulator.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +""""Test utility to simulate a repository + +RepositorySimulator provides methods to modify repository metadata so that it's +easy to "publish" new repository versions with modified metadata, while serving +the versions to client test code. + +RepositorySimulator implements FetcherInterface so Updaters in tests can use it +as a way to "download" new metadata from remote: in practice no downloading, +network connections or even file access happens as RepositorySimulator serves +everything from memory. +""" + +import logging +from collections import OrderedDict +from datetime import datetime, timedelta +from securesystemslib.keys import generate_ed25519_key +from securesystemslib.signer import SSlibSigner +from tuf.exceptions import FetcherHTTPError +from typing import Dict, Iterator, List, Optional, Tuple +from urllib import parse + +from tuf.api.metadata import( + Key, + Metadata, + MetaFile, + Role, + Root, + SPECIFICATION_VERSION, + Snapshot, + Targets, + Timestamp +) +from tuf.ngclient.fetcher import FetcherInterface + +logger = logging.getLogger(__name__) + +SPEC_VER = ".".join(SPECIFICATION_VERSION) + +class RepositorySimulator(FetcherInterface): + def __init__(self): + # all root versions are stored + self.md_roots: Dict[int, Metadata[Root]] = {} + self.md_timestamp: Metadata[Timestamp] = None + self.md_snapshot: Metadata[Snapshot] = None + self.md_targets: Metadata[Targets] = None + # all targets in one dict + self.md_delegates: Dict[str, Metadata[Targets]] = {} + + self.signers: Dict[str, List[SSlibSigner]] = {} + + self._initialize() + + @property + def root(self) -> Root: + raise NotImplementedError + + @property + def timestamp(self) -> Timestamp: + return self.md_timestamp.signed + + @property + def snapshot(self) -> Snapshot: + return self.md_snapshot.signed + + @property + def targets(self) -> Targets: + return self.md_targets.signed + + def delegates(self) -> Iterator[Tuple[str, Targets]]: + for role, md in self.md_delegates.items(): + yield role, md.signed + + def _create_key(self, role:str) -> Key: + sslib_key = generate_ed25519_key() + if role not in self.signers: + self.signers[role] = [] + self.signers[role].append(SSlibSigner(sslib_key)) + + key = Key.from_securesystemslib_key(sslib_key) + return key + + def _initialize(self): + """Setup a minimal valid repository""" + expiry = datetime.utcnow().replace(microsecond=0) + timedelta(days=30) + + targets = Targets(1, SPEC_VER, expiry, {}, None) + self.md_targets = Metadata(targets, OrderedDict()) + + meta = {"targets.json": MetaFile(targets.version)} + snapshot = Snapshot(1, SPEC_VER, expiry, meta) + self.md_snapshot = Metadata(snapshot, OrderedDict()) + + meta = {"snapshot.json": MetaFile(snapshot.version)} + timestamp = Timestamp(1, SPEC_VER, expiry, meta) + self.md_timestamp = Metadata(timestamp, OrderedDict()) + + keys = {} + roles = {} + for role in ["root", "timestamp", "snapshot", "targets"]: + key = self._create_key(role) + keys[key.keyid] = key + roles[role] = Role([key.keyid], 1) + root = Root(1, SPEC_VER, expiry, keys, roles, True) + self.md_roots[1] = Metadata(root, OrderedDict()) + + def fetch(self, url: str) -> Iterator[bytes]: + spliturl = parse.urlparse(url) + if spliturl.path.startswith("/metadata/"): + parts = spliturl.path[len("/metadata/"):].split(".") + if len(parts) == 3: + version = int(parts[0]) + role = parts[1] + else: + version = None + role = parts[0] + yield self._fetch_metadata (role, version) + else: + raise FetcherHTTPError(f"Unknown path '{spliturl.path}'", 404) + + def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: + if role == "root": + md = self.md_roots.get(version) + elif role == "timestamp": + md = self.md_timestamp + elif role == "snapshot": + md = self.md_snapshot + elif role == "targets": + md = self.md_targets + else: + md = self.md_delegates.get(role) + + if md is None: + raise FetcherHTTPError(f"Unknown role {role}", 404) + + md.signatures.clear() + for signer in self.signers[role]: + md.sign(signer) + + logger.debug("fetched metadata %s version %d", role, md.signed.version) + return md.to_bytes() + + def update_timestamp(self): + self.timestamp.meta["snapshot.json"].version = self.snapshot.version + + self.timestamp.version += 1 + + def update_snapshot(self): + self.snapshot.meta["targets.json"].version = self.targets.version + for role, delegate in self.delegates(): + self.snapshot.meta[f"{role}.json"].version = delegate.version + + self.snapshot.version += 1 + self.update_timestamp() + + def write(self, directory:str): + """Write current repository metadata to a directory""" + raise NotImplementedError + diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py new file mode 100644 index 00000000..6e36024c --- /dev/null +++ b/tests/test_updater_with_simulator.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test ngclient Updater using the repository simulator +""" + +import logging +import os +import sys +import tempfile +import unittest + +from tuf.ngclient import Updater + +from tests import utils +from tests.repository_simulator import RepositorySimulator + +class TestUpdater(unittest.TestCase): + def setUp(self): + self.client_dir = tempfile.TemporaryDirectory() + + # Setup the repository, bootstrap client root.json + self.sim = RepositorySimulator() + with open(os.path.join(self.client_dir.name, "root.json"), "bw") as f: + root = self.sim.download_bytes("https://example.com/metadata/1.root.json", 100000) + f.write(root) + + def _new_updater(self): + return Updater( + self.client_dir.name, + "https://example.com/metadata/", + "https://example.com/targets/", + self.sim + ) + + def test_refresh(self): + # Update top level metadata + updater = self._new_updater() + updater.refresh() + + # TODO compare file contents? + + # New timestamp version + self.sim.update_timestamp() + + updater = self._new_updater() + updater.refresh() + + # TODO compare file contents? + + # New targets version + self.sim.targets.version += 1 + self.sim.update_snapshot() + + updater = self._new_updater() + updater.refresh() + + # TODO compare file contents? + + + def tearDown(self): + self.client_dir.cleanup() + +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main()