Examples: Add repository application example

This uses the repository module to create an app that
* generates everything from scratch
* serves metadata and targets from memory
* simulates a live repository by adding new targets every few seconds

Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
This commit is contained in:
Jussi Kukkonen 2022-11-24 17:06:33 +02:00
parent 5e17617fc5
commit 314efaf3da
2 changed files with 231 additions and 0 deletions

View file

@ -0,0 +1,121 @@
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Simple example of using the repository library to build a repository"""
import copy
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
from securesystemslib import keys
from securesystemslib.signer import Signer, SSlibSigner
from tuf.api.metadata import (
Key,
Metadata,
MetaFile,
Root,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
from tuf.repository import Repository
logger = logging.getLogger(__name__)
_signed_init = {
Root.type: Root,
Snapshot.type: Snapshot,
Targets.type: Targets,
Timestamp.type: Timestamp,
}
class SimpleRepository(Repository):
"""Very simple in-memory repository implementation
This repository keeps the metadata for all versions of all roles in memory.
It also keeps all target content in memory.
Attributes:
role_cache: Contains every historical metadata version of every role in
this repositorys. Keys are rolenames and values are lists of
Metadata
signer_cache: Contains all signers available to the repository. Keys
are rolenames, values are lists of signers
target_cache:
"""
expiry_period = timedelta(days=1)
def __init__(self) -> None:
# all versions of all metadata
self.role_cache: Dict[str, List[Metadata]] = defaultdict(list)
# all current keys
self.signer_cache: Dict[str, List[Signer]] = defaultdict(list)
# all target content
self.target_cache: Dict[str, bytes] = {}
# setup a basic repository, generate signing key per top-level role
with self.edit("root", init=True) as root:
for role in ["root", "timestamp", "snapshot", "targets"]:
key = keys.generate_ed25519_key()
self.signer_cache[role].append(SSlibSigner(key))
root.add_key(Key.from_securesystemslib_key(key), role)
for role in ["timestamp", "snapshot", "targets"]:
with self.edit(role, init=True):
pass
def open(self, role: str, init: bool = False) -> Metadata:
"""Return current Metadata for role from 'storage' (or create a new one)"""
if init:
signed_init = _signed_init.get(role, Targets)
md = Metadata(signed_init())
# this makes version bumping in close() simpler
md.signed.version = 0
return md
# return latest metadata from storage (but don't return a reference)
return copy.deepcopy(self.role_cache[role][-1])
def close(self, role: str, md: Metadata, sign_only: bool = False) -> None:
"""Store a version of metadata. Handle version bumps, expiry, signing"""
if sign_only:
for signer in self.signer_cache[role]:
md.sign(signer, append=True)
self.role_cache[role][-1] = md
else:
md.signed.version += 1
md.signed.expires = datetime.utcnow() + self.expiry_period
md.signatures.clear()
for signer in self.signer_cache[role]:
md.sign(signer, append=True)
self.role_cache[role].append(md)
def add_target(self, path: str, content: str) -> None:
"""Add a target to repository"""
data = bytes(content, "utf-8")
# add content to cache for serving to clients
self.target_cache[path] = data
# add a target in the targets metadata
with self.edit("targets") as targets:
targets.targets[path] = TargetFile.from_data(path, data)
logger.debug("Targets v%d", targets.version)
# update snapshot, timestamp
meta = {"targets.json": MetaFile(targets.version)}
new_version, _ = self.snapshot(meta)
if new_version is not None:
self.timestamp(MetaFile(new_version))

110
examples/repository/repo Executable file
View file

@ -0,0 +1,110 @@
#!/usr/bin/env python
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Simple repository example application
The application stores metadata and targets in memory, and serves them via http.
* Keys are generated at startup
* The application simulates a live reposittory by adding a new target every few seconds
"""
import argparse
import logging
import sys
from datetime import datetime
from http.server import BaseHTTPRequestHandler, HTTPServer
from time import time
from typing import Dict, List
from _simplerepo import SimpleRepository
logger = logging.getLogger(__name__)
class ReqHandler(BaseHTTPRequestHandler):
"""HTTP handler to serve metadata and targets from a SimpleRepository"""
def do_GET(self):
if self.path.startswith("/metadata/") and self.path.endswith(".json"):
self.get_metadata(self.path[len("/metadata/"):-len(".json")])
elif self.path.startswith("/targets/"):
self.get_target(self.path[len("/targets/"):])
else:
self.send_error(404, "Only serving /metadata/*.json")
def get_metadata(self, ver_and_role: str):
repo = self.server.repo
ver_str, sep, role = ver_and_role.rpartition(".")
if sep == "":
# 0 will lead to list lookup with -1, meaning latest version
ver = 0
else:
ver = int(ver_str)
if role not in repo.role_cache or ver > len(repo.role_cache[role]):
self.send_error(404, f"Role {role} version {ver} not found")
return
# send the metadata json
data = repo.role_cache[role][ver-1].to_bytes()
self.send_response(200)
self.send_header('Content-length', len(data))
self.end_headers()
self.wfile.write(data)
def get_target(self, targetpath: str):
repo: SimpleRepository = self.server.repo
_hash, _, target = targetpath.partition(".")
if target not in repo.target_cache:
self.send_error(404, f"target {targetpath} not found")
return
# TODO: check that hash actually matches -- or use hash.targetpath as target_cache keys?
# send the target content
data = repo.target_cache[target]
self.send_response(200)
self.send_header('Content-length', len(data))
self.end_headers()
self.wfile.write(data)
class RepositoryServer(HTTPServer):
def __init__(self, port: int):
super().__init__(("127.0.0.1", port), ReqHandler)
self.timeout = 1
self.repo = SimpleRepository()
def main(argv: List[str]) -> None:
"""Example repository server"""
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="count")
parser.add_argument("-p", "--port", type=int, default=8001)
args, _ = parser.parse_known_args(argv)
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level)
server = RepositoryServer(args.port)
last_change = 0
counter = 0
logger.info(f"Now serving. Root v1 at http://127.0.0.1:{server.server_port}/metadata/1.root.json")
while True:
# Simulate a live repository: Add a new target file every few seconds
if time() - last_change > 10:
last_change = int(time())
counter += 1
content = str(datetime.fromtimestamp(last_change))
server.repo.add_target(f"file{str(counter)}.txt", content)
server.handle_request()
if __name__ == "__main__":
main(sys.argv)