Merge pull request #2241 from jku/repository-lib-uploader

Examples: Add repository uploader
This commit is contained in:
Lukas Pühringer 2023-02-08 10:30:52 +01:00 committed by GitHub
commit dd855b1fca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 457 additions and 33 deletions

View file

@ -1,5 +1,6 @@
# Usage examples
* [repository](repository)
* [client](client_example)
* [repository built with low-level Metadata API](manual_repo)
* [client](client)
* [uploader tool](uploader)
* [Low-level Metadata API examples](manual_repo)

View file

@ -9,7 +9,11 @@ This TUF Repository Application Example has the following features:
- Serves metadata and targets on localhost (default port 8001)
- Simulates a live repository by automatically adding a new target
file every 10 seconds.
- Exposes a small API for the [uploader tool example](../uploader/). API POST endpoints are:
- `/api/role/<ROLE>`: For uploading new delegated targets metadata. Payload
is new version of ROLEs metadata
- `/api/delegation/<ROLE>`: For modifying or creating a delegation for ROLE.
Payload is a dict with one keyid:Key pair
### Usage
@ -18,4 +22,4 @@ This TUF Repository Application Example has the following features:
```
Your repository is now running and is accessible on localhost, See e.g.
http://127.0.0.1:8001/metadata/1.root.json. The
[client example](../client_example/README.md) uses this address by default.
[client example](../client/README.md) uses this address by default.

View file

@ -4,15 +4,19 @@
"""Simple example of using the repository library to build a repository"""
import copy
import json
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
from securesystemslib import keys
from securesystemslib.signer import Signer, SSlibKey, SSlibSigner
from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner
from tuf.api.exceptions import RepositoryError
from tuf.api.metadata import (
DelegatedRole,
Delegations,
Metadata,
MetaFile,
Root,
@ -116,7 +120,7 @@ def close(self, role: str, md: Metadata) -> None:
self._targets_infos[f"{role}.json"].version = md.signed.version
def add_target(self, path: str, content: str) -> None:
"""Add a target to repository"""
"""Add a target to top-level targets metadata"""
data = bytes(content, "utf-8")
# add content to cache for serving to clients
@ -131,3 +135,73 @@ def add_target(self, path: str, content: str) -> None:
# update snapshot, timestamp
self.snapshot()
self.timestamp()
def submit_delegation(self, rolename: str, data: bytes) -> bool:
"""Add a delegation to a (offline signed) delegated targets metadata"""
try:
logger.debug("Processing new delegation to role %s", rolename)
keyid, keydict = next(iter(json.loads(data).items()))
key = Key.from_dict(keyid, keydict)
# add delegation and key
role = DelegatedRole(rolename, [], 1, True, [f"{rolename}/*"])
with self.edit("targets") as targets:
if targets.delegations is None:
targets.delegations = Delegations({}, {})
targets.delegations.roles[rolename] = role
targets.add_key(key, rolename)
except (RepositoryError, json.JSONDecodeError) as e:
logger.info("Failed to add delegation for %s: %s", rolename, e)
return False
logger.debug("Targets v%d", targets.version)
# update snapshot, timestamp
self.snapshot()
self.timestamp()
return True
def submit_role(self, role: str, data: bytes) -> bool:
"""Add a new version of a delegated roles metadata"""
try:
logger.debug("Processing new version for role %s", role)
if role in ["root", "snapshot", "timestamp", "targets"]:
raise ValueError("Only delegated targets are accepted")
md = Metadata.from_bytes(data)
for targetpath in md.signed.targets:
if not targetpath.startswith(f"{role}/"):
raise ValueError(f"targets allowed under {role}/ only")
targets_md = self.role_cache["targets"][-1]
targets_md.verify_delegate(role, md)
if role in self.role_cache:
current_md = self.role_cache[role][-1]
current_ver = current_md.signed.version
else:
current_ver = 0
if md.signed.version != current_ver + 1:
raise ValueError("Invalid version {md.signed.version}")
except (RepositoryError, ValueError) as e:
logger.info("Failed to add new version for %s: %s", role, e)
return False
# Checks passed: Add new delegated role version
self.role_cache[role].append(md)
self._targets_infos[f"{role}.json"].version = md.signed.version
logger.debug("%s v%d", role, md.signed.version)
# To keep it simple, target content is generated from targetpath
for targetpath in md.signed.targets:
self.target_cache[targetpath] = bytes(f"{targetpath}", "utf-8")
# update snapshot, timestamp
self.snapshot()
self.timestamp()
return True

View file

@ -13,25 +13,61 @@ import argparse
import logging
import sys
from datetime import datetime
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from time import time
from typing import Dict, List
from _simplerepo import SimpleRepository
from tuf.api.serialization.json import JSONSerializer
logger = logging.getLogger(__name__)
class ReqHandler(BaseHTTPRequestHandler):
"""HTTP handler to serve metadata and targets from a SimpleRepository"""
"""HTTP handler for the repository example application
Serves metadata, targets and a small upload API using a SimpleRepository
"""
def do_POST(self):
"""Handle POST requests, aka the 'uploader API'"""
content_len = int(self.headers.get("content-length", 0))
data = self.rfile.read(content_len)
if self.path.startswith("/api/delegation/"):
role = self.path[len("/api/delegation/") :]
if not self.server.repo.submit_delegation(role, data):
return self.send_error(400, f"Failed to delegate to {role}")
elif self.path.startswith("/api/role/"):
role = self.path[len("/api/role/") :]
if not self.server.repo.submit_role(role, data):
return self.send_error(400, f"Failed to submit role {role}")
else:
return self.send_error(404)
self.send_response(200)
self.end_headers()
def do_GET(self):
"""Handle GET: metadata and target files"""
data = None
if self.path.startswith("/metadata/") and self.path.endswith(".json"):
self.get_metadata(self.path[len("/metadata/") : -len(".json")])
data = self.get_metadata(
self.path[len("/metadata/") : -len(".json")]
)
elif self.path.startswith("/targets/"):
self.get_target(self.path[len("/targets/") :])
data = self.get_target(self.path[len("/targets/") :])
if data is None:
self.send_error(404)
else:
self.send_error(404, "Only serving /metadata/*.json")
self.send_response(200)
self.send_header("Content-length", len(data))
self.end_headers()
self.wfile.write(data)
def get_metadata(self, ver_and_role: str):
repo = self.server.repo
@ -44,35 +80,28 @@ class ReqHandler(BaseHTTPRequestHandler):
ver = int(ver_str)
if role not in repo.role_cache or ver > len(repo.role_cache[role]):
self.send_error(404, f"Role {role} version {ver} not found")
return
return None
# send the metadata json
data = repo.role_cache[role][ver - 1].to_bytes()
self.send_response(200)
self.send_header("Content-length", len(data))
self.end_headers()
self.wfile.write(data)
# return metadata
return repo.role_cache[role][ver - 1].to_bytes(JSONSerializer())
def get_target(self, targetpath: str):
repo: SimpleRepository = self.server.repo
_hash, _, target = targetpath.partition(".")
repo = self.server.repo
# unimplement the dumb hashing scheme
# TODO: maybe use hashed paths as the target_cache key
dir, sep, hashname = targetpath.rpartition("/")
_, _, name = hashname.partition(".")
target = f"{dir}{sep}{name}"
if target not in repo.target_cache:
self.send_error(404, f"target {targetpath} not found")
return
# TODO: check that hash actually matches -- or use hash.targetpath as target_cache keys?
return None
# send the target content
data = repo.target_cache[target]
self.send_response(200)
self.send_header("Content-length", len(data))
self.end_headers()
self.wfile.write(data)
return repo.target_cache[target]
class RepositoryServer(HTTPServer):
class RepositoryServer(ThreadingHTTPServer):
def __init__(self, port: int):
super().__init__(("127.0.0.1", port), ReqHandler)
self.timeout = 1

View file

@ -0,0 +1,44 @@
# TUF Uploader Tool Example
:warning: This example uses the repository module which is not considered
part of the python-tuf stable API quite yet.
This is an example maintainer tool: It makes it possible to add delegations to
a remote repository, and then to upload delegated metadata to the repository.
Features:
- Initialization (much like the [client example](../client/))
- Claim delegation: this uses "unsafe repository API" in the sense that the
uploader sends repository unsigned data. This operation can be
compared to claiming a project name on PyPI.org
- Add targetfile: Here uploader uses signing keys that were added to the
delegation in the previous step to create a new version of the delegated
metadata. The repository will verify signatures on this metadata.
The used TUF repository can be set with `--url` (default repository is
"http://127.0.0.1:8001" which is also the default for the repository example).
In practice the uploader tool is only useful with the repository example.
### Usage with the repository example
In one terminal, run the [repository example](../repository/) and leave it running:
```console
examples/repository/repo
```
In another terminal, run uploader:
```console
# Initialize with Trust-On-First-Use
./uploader tofu
# Then claim a delegation for yourself (this also creates a new signing key):
./uploader add-delegation myrole
# Then add a new downloadable target file to your delegated role (to keep the
# example simple, the target file content is always the targetpath):
./uploader add-target myrole myrole/mytargetfile
```
At this point "myrole/mytargetfile" is downloadable from the repository
with the [client example](../client/).

View file

@ -0,0 +1,132 @@
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""A Repository implementation for maintainer and developer tools"""
import copy
import json
import logging
import os
from datetime import datetime, timedelta
from typing import Dict
import requests
from securesystemslib import keys
from securesystemslib.signer import SSlibKey, SSlibSigner
from tuf.api.exceptions import RepositoryError
from tuf.api.metadata import Metadata, MetaFile, TargetFile, Targets
from tuf.api.serialization.json import JSONSerializer
from tuf.ngclient import Updater
from tuf.repository import Repository
logger = logging.getLogger(__name__)
class LocalRepository(Repository):
"""A repository implementation that fetches data from a remote repository
This implementation fetches metadata from a remote repository, potentially
creates new versions of metadata, and submits to the remote repository.
ngclient Updater is used to fetch metadata from remote server: this is good
because we want to make sure the metadata we modify is verified, but also
bad because we need some hacks to access the Updaters metadata.
"""
expiry_period = timedelta(days=1)
def __init__(self, metadata_dir: str, key_dir: str, base_url: str):
self.key_dir = key_dir
if not os.path.isdir(self.key_dir):
os.makedirs(self.key_dir)
self.base_url = base_url
self.updater = Updater(
metadata_dir=metadata_dir,
metadata_base_url=f"{base_url}/metadata/",
)
self.updater.refresh()
@property
def targets_infos(self) -> Dict[str, MetaFile]:
raise NotImplementedError # we never call snapshot
@property
def snapshot_info(self) -> MetaFile:
raise NotImplementedError # we never call timestamp
def open(self, role: str) -> Metadata:
"""Return cached (or fetched) metadata"""
# if there is a metadata version fetched from remote, use that
# HACK: access Updater internals
# pylint: disable=protected-access
if role in self.updater._trusted_set:
return copy.deepcopy(self.updater._trusted_set[role])
# otherwise we're creating metadata from scratch
md = Metadata(Targets())
# this makes version bumping in close() simpler
md.signed.version = 0
return md
def close(self, role: str, md: Metadata) -> None:
"""Store a version of metadata. Handle version bumps, expiry, signing"""
md.signed.version += 1
md.signed.expires = datetime.utcnow() + self.expiry_period
with open(f"{self.key_dir}/{role}", "rt", encoding="utf-8") as f:
signer = SSlibSigner(json.loads(f.read()))
md.sign(signer, append=False)
# Upload using "api/role"
uri = f"{self.base_url}/api/role/{role}"
r = requests.post(uri, data=md.to_bytes(JSONSerializer()), timeout=5)
r.raise_for_status()
def add_target(self, role: str, targetpath: str) -> bool:
"""Add target to roles metadata and submit new metadata version"""
# HACK: make sure we have the roles metadata in updater._trusted_set
# (or that we're publishing the first version)
try:
self.updater.get_targetinfo(targetpath)
except RepositoryError:
# HACK Assume this is because we're just publishing version 1
# (so the roles metadata does not exist on server yet)
pass
data = bytes(targetpath, "utf-8")
targetfile = TargetFile.from_data(targetpath, data)
try:
with self.edit(role) as delegated:
delegated.targets[targetpath] = targetfile
except Exception as e: # pylint: disable=broad-except
print(f"Failed to submit new {role} with added target: {e}")
return False
print(f"Uploaded role {role} v{delegated.version}")
return True
def add_delegation(self, role: str) -> bool:
"""Use the (unauthenticated) delegation adding API endpoint"""
keydict = keys.generate_ed25519_key()
pubkey = SSlibKey.from_securesystemslib_key(keydict)
data = {pubkey.keyid: pubkey.to_dict()}
url = f"{self.base_url}/api/delegation/{role}"
r = requests.post(url, data=json.dumps(data), timeout=5)
if r.status_code != 200:
print(f"delegation failed with {r}")
return False
# Store the private key using rolename as filename
with open(f"{self.key_dir}/{role}", "wt", encoding="utf-8") as f:
f.write(json.dumps(keydict))
print(f"Uploaded new delegation, stored key in {self.key_dir}/{role}")
return True

140
examples/uploader/uploader Executable file
View file

@ -0,0 +1,140 @@
#!/usr/bin/env python
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Simple uploader tool example
Uploader is a maintainer application that communicates with the repository
example. Uploader controls offline signing keys and produces signed metadata
that it sends to the repository application so that the metadata can be added
to the repository.
"""
import argparse
import logging
import os
import sys
from hashlib import sha256
from pathlib import Path
from typing import List, Optional
from urllib import request
from _localrepo import LocalRepository
logger = logging.getLogger(__name__)
def build_metadata_dir(base_url: str) -> str:
"""build a unique and reproducible metadata dirname for the repo url"""
name = sha256(base_url.encode()).hexdigest()[:8]
# TODO: Make this not windows hostile?
return f"{Path.home()}/.local/share/tuf-upload-example/{name}"
def build_key_dir(base_url: str) -> str:
"""build a unique and reproducible private key dir for the repository url"""
name = sha256(base_url.encode()).hexdigest()[:8]
# TODO: Make this not windows hostile?
return f"{Path.home()}/.config/tuf-upload-example/{name}"
def init_tofu(base_url: str) -> bool:
"""Initialize local trusted metadata (Trust-On-First-Use)"""
metadata_dir = build_metadata_dir(base_url)
if not os.path.isdir(metadata_dir):
os.makedirs(metadata_dir)
root_url = f"{base_url}/metadata/1.root.json"
try:
request.urlretrieve(root_url, f"{metadata_dir}/root.json")
except OSError:
print(f"Failed to download initial root from {root_url}")
return False
print(f"Trust-on-First-Use: Initialized new root in {metadata_dir}")
return True
def init(base_url: str) -> Optional[LocalRepository]:
"""Initialize a LocalRepository: local root.json must already exist"""
metadata_dir = build_metadata_dir(base_url)
keydir = build_key_dir(base_url)
if not os.path.isfile(f"{metadata_dir}/root.json"):
print(
"Trusted local root not found. Use 'tofu' command to "
"Trust-On-First-Use or copy trusted root metadata to "
f"{metadata_dir}/root.json"
)
return None
print(f"Using trusted root in {metadata_dir}")
return LocalRepository(metadata_dir, keydir, base_url)
def main(argv: List[str]) -> None:
"""Example uploader tool"""
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="count", default=0)
parser.add_argument(
"-u",
"--url",
help="Base repository URL",
default="http://127.0.0.1:8001",
)
subparsers = parser.add_subparsers(dest="sub_command")
tofu_cmd = subparsers.add_parser(
"tofu",
help="Initialize client with Trust-On-First-Use",
)
add_delegation_cmd = subparsers.add_parser(
"add-delegation",
help="Create a delegation and signing key",
)
add_delegation_cmd.add_argument("rolename")
add_target_cmd = subparsers.add_parser(
"add-target",
help="Add a target to a delegated role",
)
add_target_cmd.add_argument("rolename")
add_target_cmd.add_argument("targetpath")
args = parser.parse_args()
if args.verbose == 0:
loglevel = logging.ERROR
elif args.verbose == 1:
loglevel = logging.WARNING
elif args.verbose == 2:
loglevel = logging.INFO
else:
loglevel = logging.DEBUG
logging.basicConfig(level=loglevel)
if args.sub_command == "tofu":
if not init_tofu(args.url):
return "Failed to initialize local repository"
elif args.sub_command == "add-delegation":
repo = init(args.url)
if not repo:
return "Failed to initialize"
if not repo.add_delegation(args.rolename):
return "Failed to add delegation"
elif args.sub_command == "add-target":
repo = init(args.url)
if not repo:
return "Failed to initialize"
if not repo.add_target(args.rolename, args.targetpath):
return "Failed to add target"
else:
parser.print_help()
if __name__ == "__main__":
sys.exit(main(sys.argv))

View file

@ -33,8 +33,8 @@
the same time is not supported.
A simple example of using the Updater to implement a Python TUF client that
downloads target files is available in `examples/client_example
<https://github.com/theupdateframework/python-tuf/tree/develop/examples/client_example>`_.
downloads target files is available in `examples/client
<https://github.com/theupdateframework/python-tuf/tree/develop/examples/client>`_.
"""
import logging