diff --git a/examples/repository/_simplerepo.py b/examples/repository/_simplerepo.py new file mode 100644 index 00000000..225222a0 --- /dev/null +++ b/examples/repository/_simplerepo.py @@ -0,0 +1,121 @@ +# Copyright 2021-2022 python-tuf contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Simple example of using the repository library to build a repository""" + +import copy +import logging +from collections import defaultdict +from datetime import datetime, timedelta +from typing import Dict, List + +from securesystemslib import keys +from securesystemslib.signer import Signer, SSlibSigner + +from tuf.api.metadata import ( + Key, + Metadata, + MetaFile, + Root, + Snapshot, + TargetFile, + Targets, + Timestamp, +) +from tuf.repository import Repository + +logger = logging.getLogger(__name__) + +_signed_init = { + Root.type: Root, + Snapshot.type: Snapshot, + Targets.type: Targets, + Timestamp.type: Timestamp, +} + + +class SimpleRepository(Repository): + """Very simple in-memory repository implementation + + This repository keeps the metadata for all versions of all roles in memory. + It also keeps all target content in memory. + + + Attributes: + role_cache: Contains every historical metadata version of every role in + this repositorys. Keys are rolenames and values are lists of + Metadata + signer_cache: Contains all signers available to the repository. Keys + are rolenames, values are lists of signers + target_cache: + """ + + expiry_period = timedelta(days=1) + + def __init__(self) -> None: + # all versions of all metadata + self.role_cache: Dict[str, List[Metadata]] = defaultdict(list) + # all current keys + self.signer_cache: Dict[str, List[Signer]] = defaultdict(list) + # all target content + self.target_cache: Dict[str, bytes] = {} + + # setup a basic repository, generate signing key per top-level role + with self.edit("root", init=True) as root: + for role in ["root", "timestamp", "snapshot", "targets"]: + key = keys.generate_ed25519_key() + self.signer_cache[role].append(SSlibSigner(key)) + root.add_key(Key.from_securesystemslib_key(key), role) + + for role in ["timestamp", "snapshot", "targets"]: + with self.edit(role, init=True): + pass + + def open(self, role: str, init: bool = False) -> Metadata: + """Return current Metadata for role from 'storage' (or create a new one)""" + + if init: + signed_init = _signed_init.get(role, Targets) + md = Metadata(signed_init()) + + # this makes version bumping in close() simpler + md.signed.version = 0 + return md + + # return latest metadata from storage (but don't return a reference) + return copy.deepcopy(self.role_cache[role][-1]) + + def close(self, role: str, md: Metadata, sign_only: bool = False) -> None: + """Store a version of metadata. Handle version bumps, expiry, signing""" + if sign_only: + for signer in self.signer_cache[role]: + md.sign(signer, append=True) + self.role_cache[role][-1] = md + else: + md.signed.version += 1 + md.signed.expires = datetime.utcnow() + self.expiry_period + + md.signatures.clear() + for signer in self.signer_cache[role]: + md.sign(signer, append=True) + + self.role_cache[role].append(md) + + def add_target(self, path: str, content: str) -> None: + """Add a target to repository""" + data = bytes(content, "utf-8") + + # add content to cache for serving to clients + self.target_cache[path] = data + + # add a target in the targets metadata + with self.edit("targets") as targets: + targets.targets[path] = TargetFile.from_data(path, data) + + logger.debug("Targets v%d", targets.version) + + # update snapshot, timestamp + meta = {"targets.json": MetaFile(targets.version)} + new_version, _ = self.snapshot(meta) + if new_version is not None: + self.timestamp(MetaFile(new_version)) diff --git a/examples/repository/repo b/examples/repository/repo new file mode 100755 index 00000000..1d7d53c8 --- /dev/null +++ b/examples/repository/repo @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# Copyright 2021-2022 python-tuf contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Simple repository example application + +The application stores metadata and targets in memory, and serves them via http. +* Keys are generated at startup +* The application simulates a live reposittory by adding a new target every few seconds +""" + +import argparse +import logging +import sys +from datetime import datetime +from http.server import BaseHTTPRequestHandler, HTTPServer +from time import time +from typing import Dict, List + +from _simplerepo import SimpleRepository + +logger = logging.getLogger(__name__) + +class ReqHandler(BaseHTTPRequestHandler): + """HTTP handler to serve metadata and targets from a SimpleRepository""" + + def do_GET(self): + if self.path.startswith("/metadata/") and self.path.endswith(".json"): + self.get_metadata(self.path[len("/metadata/"):-len(".json")]) + elif self.path.startswith("/targets/"): + self.get_target(self.path[len("/targets/"):]) + else: + self.send_error(404, "Only serving /metadata/*.json") + + def get_metadata(self, ver_and_role: str): + repo = self.server.repo + + ver_str, sep, role = ver_and_role.rpartition(".") + if sep == "": + # 0 will lead to list lookup with -1, meaning latest version + ver = 0 + else: + ver = int(ver_str) + + if role not in repo.role_cache or ver > len(repo.role_cache[role]): + self.send_error(404, f"Role {role} version {ver} not found") + return + + # send the metadata json + data = repo.role_cache[role][ver-1].to_bytes() + self.send_response(200) + self.send_header('Content-length', len(data)) + self.end_headers() + self.wfile.write(data) + + def get_target(self, targetpath: str): + repo: SimpleRepository = self.server.repo + _hash, _, target = targetpath.partition(".") + + if target not in repo.target_cache: + self.send_error(404, f"target {targetpath} not found") + return + + # TODO: check that hash actually matches -- or use hash.targetpath as target_cache keys? + + # send the target content + data = repo.target_cache[target] + self.send_response(200) + self.send_header('Content-length', len(data)) + self.end_headers() + self.wfile.write(data) + + +class RepositoryServer(HTTPServer): + def __init__(self, port: int): + super().__init__(("127.0.0.1", port), ReqHandler) + self.timeout = 1 + self.repo = SimpleRepository() + + +def main(argv: List[str]) -> None: + """Example repository server""" + + parser = argparse.ArgumentParser() + parser.add_argument("-v", "--verbose", action="count") + parser.add_argument("-p", "--port", type=int, default=8001) + args, _ = parser.parse_known_args(argv) + + level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig(level=level) + + server = RepositoryServer(args.port) + last_change = 0 + counter = 0 + + logger.info(f"Now serving. Root v1 at http://127.0.0.1:{server.server_port}/metadata/1.root.json") + + while True: + # Simulate a live repository: Add a new target file every few seconds + if time() - last_change > 10: + last_change = int(time()) + counter += 1 + content = str(datetime.fromtimestamp(last_change)) + server.repo.add_target(f"file{str(counter)}.txt", content) + + server.handle_request() + + +if __name__ == "__main__": + main(sys.argv)