Merge pull request #1685 from lukpueh/repo-examples

Docs: Add repository tutorial based on metadata API
This commit is contained in:
lukpueh 2021-11-29 13:31:16 +01:00 committed by GitHub
commit 51248db173
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 485 additions and 0 deletions

View file

@ -0,0 +1,401 @@
"""
A TUF repository example using the low-level TUF Metadata API.
As 'repository_tool' and 'repository_lib' are being deprecated, repository
metadata must be created and maintained *manually* using the low-level
Metadata API. The example code in this file demonstrates how to
implement similar functionality to that of the legacy 'repository_tool'
and 'repository_lib' until a new repository implementation is available.
Contents:
* creation of top-level metadata
* target file handling
* consistent snapshots
* key management
* top-level delegation and signing thresholds
* target delegation
* in-band and out-of-band metadata signing
* writing and reading metadata files
* root key rotation
NOTE: Metadata files will be written to a 'tmp*'-directory in CWD.
"""
import os
import tempfile
from collections import OrderedDict
from datetime import datetime, timedelta
from pathlib import Path
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibSigner
from tuf.api.metadata import (
DelegatedRole,
Delegations,
Key,
Metadata,
MetaFile,
Role,
Root,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
from tuf.api.serialization.json import JSONSerializer
def _in(days):
"""Adds 'days' to now and returns datetime object w/o microseconds."""
return datetime.utcnow().replace(microsecond=0) + timedelta(days=days)
# Create top-level metadata
# =========================
# Every TUF repository has at least four roles, i.e. the top-level roles
# 'targets', 'snapshot', 'timestamp' and 'root'. Below we will discuss their
# purpose, show how to create the corresponding metadata, and how to use them
# to provide integrity, consistency and freshness for the files TUF aims to
# protect, i.e. target files.
# Common fields
# -------------
# All roles have the same metadata container format, for which the metadata API
# provides a generic 'Metadata' class. This class has two fields, one for
# cryptographic signatures, i.e. 'signatures', and one for the payload over
# which signatures are generated, i.e. 'signed'. The payload must be an
# instance of either 'Targets', 'Snapshot', 'Timestamp' or 'Root' class. Common
# fields in all of these 'Signed' classes are:
#
# spec_version -- The supported TUF specification version number.
# version -- The metadata version number.
# expires -- The metadata expiry date.
#
# The 'version', which is incremented on each metadata change, is used to
# reference metadata from within other metadata, and thus allows for repository
# consistency in addition to protecting against rollback attacks.
#
# The date the metadata 'expires' protects against freeze attacks and allows
# for implicit key revocation. Choosing an appropriate expiration interval
# depends on the volatility of a role and how easy it is to re-sign them.
# Highly volatile roles (timestamp, snapshot, targets), usually have shorter
# expiration intervals, whereas roles that change less and might use offline
# keys (root, delegating targets) may have longer expiration intervals.
SPEC_VERSION = "1.0.19"
# Define containers for role objects and cryptographic keys created below. This
# allows us to sign and write metadata in a batch more easily.
roles = {}
keys = {}
# Targets (integrity)
# -------------------
# The targets role guarantees integrity for the files that TUF aims to protect,
# i.e. target files. It does so by listing the relevant target files, along
# with their hash and length.
roles["targets"] = Metadata[Targets](
signed=Targets(
version=1, spec_version=SPEC_VERSION, expires=_in(7), targets={}
),
signatures=OrderedDict(),
)
# For the purpose of this example we use the top-level targets role to protect
# the integrity of this very example script. The metadata entry contains the
# hash and length of this file at the local path. In addition, it specifies the
# 'target path', which a client uses to locate the target file relative to a
# configured mirror base URL.
#
# |----base URL---||-------target path-------|
# e.g. tuf-examples.org/repo_example/basic_repo.py
local_path = Path(__file__).resolve()
target_path = f"{local_path.parts[-2]}/{local_path.parts[-1]}"
target_file_info = TargetFile.from_file(target_path, local_path)
roles["targets"].signed.targets[target_path] = target_file_info
# Snapshot (consistency)
# ----------------------
# The snapshot role guarantees consistency of the entire repository. It does so
# by listing all available targets metadata files at their latest version. This
# becomes relevant, when there are multiple targets metadata files in a
# repository and we want to protect the client against mix-and-match attacks.
roles["snapshot"] = Metadata[Snapshot](
Snapshot(
version=1,
spec_version=SPEC_VERSION,
expires=_in(7),
meta={"targets.json": MetaFile(version=1)},
),
OrderedDict(),
)
# Timestamp (freshness)
# ---------------------
# The timestamp role guarantees freshness of the repository metadata. It does
# so by listing the latest snapshot (which in turn lists all the latest
# targets) metadata. A short expiration interval requires the repository to
# regularly issue new timestamp metadata and thus protects the client against
# freeze attacks.
#
# Note that snapshot and timestamp use the same generic wireline metadata
# format. But given that timestamp metadata always has only one entry in its
# 'meta' field, i.e. for the latest snapshot file, the timestamp object
# provides the shortcut 'snapshot_meta'.
roles["timestamp"] = Metadata[Timestamp](
Timestamp(
version=1,
spec_version=SPEC_VERSION,
expires=_in(1),
snapshot_meta=MetaFile(version=1),
),
OrderedDict(),
)
# Root (root of trust)
# --------------------
# The root role serves as root of trust for all top-level roles, including
# itself. It does so by mapping cryptographic keys to roles, i.e. the keys that
# are authorized to sign any top-level role metadata, and signing thresholds,
# i.e. how many authorized keys are required for a given role (see 'roles'
# field). This is called top-level delegation.
#
# In addition, root provides all public keys to verify these signatures (see
# 'keys' field), and a configuration parameter that describes whether a
# repository uses consistent snapshots (see section 'Persist metadata' below
# for more details).
#
# For this example, we generate one 'ed25519' key pair for each top-level role
# using python-tuf's in-house crypto library.
# See https://github.com/secure-systems-lab/securesystemslib for more details
# about key handling, and don't forget to password-encrypt your private keys!
for name in ["targets", "snapshot", "timestamp", "root"]:
keys[name] = generate_ed25519_key()
# Create root metadata object
roles["root"] = Metadata[Root](
signed=Root(
version=1,
spec_version=SPEC_VERSION,
expires=_in(365),
keys={
key["keyid"]: Key.from_securesystemslib_key(key)
for key in keys.values()
},
roles={
role: Role([key["keyid"]], threshold=1)
for role, key in keys.items()
},
consistent_snapshot=True,
),
signatures=OrderedDict(),
)
# NOTE: We only need the public part to populate root, so it is possible to use
# out-of-band mechanisms to generate key pairs and only expose the public part
# to whoever maintains the root role. As a matter of fact, the very purpose of
# signature thresholds is to avoid having private keys all in one place.
# Signature thresholds
# --------------------
# Given the importance of the root role, it is highly recommended to require a
# threshold of multiple keys to sign root metadata. For this example we
# generate another root key (you can pretend it's out-of-band) and increase the
# required signature threshold.
another_root_key = generate_ed25519_key()
roles["root"].signed.add_key(
"root", Key.from_securesystemslib_key(another_root_key)
)
roles["root"].signed.roles["root"].threshold = 2
# Sign top-level metadata (in-band)
# =================================
# In this example we have access to all top-level signing keys, so we can use
# them to create and add a signature for each role metadata.
for name in ["targets", "snapshot", "timestamp", "root"]:
key = keys[roles[name].signed.type]
signer = SSlibSigner(key)
roles[name].sign(signer)
# Persist metadata (consistent snapshot)
# ======================================
# It is time to publish the first set of metadata for a client to safely
# download the target file that we have registered for this example repository.
#
# For the purpose of this example we will follow the consistent snapshot naming
# convention for all metadata. This means that each metadata file, must be
# prefixed with its version number, except for timestamp. The naming convention
# also affects the target files, but we don't cover this in the example. See
# the TUF specification for more details:
# https://theupdateframework.github.io/specification/latest/#writing-consistent-snapshots
#
# Also note that the TUF specification does not mandate a wireline format. In
# this demo we use a non-compact JSON format and store all metadata in
# temporary directory at CWD for review.
PRETTY = JSONSerializer(compact=False)
TMP_DIR = tempfile.mkdtemp(dir=os.getcwd())
for name in ["root", "targets", "snapshot"]:
filename = f"{roles[name].signed.version}.{roles[name].signed.type}.json"
path = os.path.join(TMP_DIR, filename)
roles[name].to_file(path, serializer=PRETTY)
roles["timestamp"].to_file(
os.path.join(TMP_DIR, "timestamp.json"), serializer=PRETTY
)
# Threshold signing (out-of-band)
# ===============================
# As mentioned above, using signature thresholds usually entails that not all
# signing keys for a given role are in the same place. Let's briefly pretend
# this is the case for the second root key we registered above, and we are now
# on that key owner's computer. All the owner has to do is read the metadata
# file, sign it, and write it back to the same file, and this can be repeated
# until the threshold is satisfied.
root_path = os.path.join(TMP_DIR, "1.root.json")
roles["root"].from_file(root_path)
roles["root"].sign(SSlibSigner(another_root_key), append=True)
roles["root"].to_file(root_path, serializer=PRETTY)
# Targets Delegation
# ==================
# Similar to how the root role delegates responsibilities about integrity,
# consistency and freshness to the corresponding top-level roles, a targets
# role may further delegate its responsibility for target files (or a subset
# thereof) to other targets roles. This allows creation of a granular trust
# hierarchy, and further reduces the impact of a single role compromise.
#
# In this example the top-level targets role trusts a new "python-scripts"
# targets role to provide integrity for any target file that ends with ".py".
delegatee_name = "python-scripts"
keys[delegatee_name] = generate_ed25519_key()
# Delegatee
# ---------
# Create a new targets role, akin to how we created top-level targets above, and
# add target file info from above according to the delegatee's responsibility.
roles[delegatee_name] = Metadata[Targets](
signed=Targets(
version=1,
spec_version=SPEC_VERSION,
expires=_in(7),
targets={target_path: target_file_info},
),
signatures=OrderedDict(),
)
# Delegator
# ---------
# Akin to top-level delegation, the delegator expresses its trust in the
# delegatee by authorizing a threshold of cryptographic keys to provide
# signatures for the delegatee metadata. It also provides the corresponding
# public key store.
# The delegation info defined by the delegator further requires the provision
# of a unique delegatee name and constraints about the target files the
# delegatee is responsible for, e.g. a list of path patterns. For details about
# all configuration parameters see
# https://theupdateframework.github.io/specification/latest/#delegations
roles["targets"].signed.delegations = Delegations(
keys={
keys[delegatee_name]["keyid"]: Key.from_securesystemslib_key(
keys[delegatee_name]
)
},
roles=OrderedDict(
[
(
delegatee_name,
DelegatedRole(
name=delegatee_name,
keyids=[keys[delegatee_name]["keyid"]],
threshold=1,
terminating=True,
paths=["*.py"],
),
)
]
),
)
# Remove target file info from top-level targets (delegatee is now responsible)
del roles["targets"].signed.targets[target_path]
# Increase expiry (delegators should be less volatile)
roles["targets"].expires = _in(365)
# Snapshot + Timestamp + Sign + Persist
# -------------------------------------
# In order to publish a new consistent set of metadata, we need to update
# dependent roles (snapshot, timestamp) accordingly, bumping versions of all
# changed metadata.
# Bump targets version
roles["targets"].signed.version += 1
# Update snapshot to account for changed and new targets metadata
roles["snapshot"].signed.meta["targets.json"].version = roles[
"targets"
].signed.version
roles["snapshot"].signed.meta[f"{delegatee_name}.json"] = MetaFile(version=1)
roles["snapshot"].signed.version += 1
# Update timestamp to account for changed snapshot metadata
roles["timestamp"].signed.snapshot_meta.version = roles[
"snapshot"
].signed.version
roles["timestamp"].signed.version += 1
# Sign and write metadata for all changed roles, i.e. all but root
for role_name in ["targets", "python-scripts", "snapshot", "timestamp"]:
signer = SSlibSigner(keys[role_name])
roles[role_name].sign(signer)
# Prefix all but timestamp with version number (see consistent snapshot)
filename = f"{role_name}.json"
if role_name != "timestamp":
filename = f"{roles[role_name].signed.version}.{filename}"
roles[role_name].to_file(os.path.join(TMP_DIR, filename), serializer=PRETTY)
# Root key rotation (recover from a compromise / key loss)
# ========================================================
# TUF makes it easy to recover from a key compromise in-band. Given the trust
# hierarchy through top-level and targets delegation you can easily
# replace compromised or lost keys for any role using the delegating role, even
# for the root role.
# However, since root authorizes its own keys, it always has to be signed with
# both the threshold of keys from the previous version and the threshold of
# keys from the new version. This establishes a trusted line of continuity.
#
# In this example we will replace a root key, and sign a new version of root
# with the threshold of old and new keys. Since one of the previous root keys
# remains in place, it can be used to count towards the old and new threshold.
new_root_key = generate_ed25519_key()
roles["root"].signed.remove_key("root", keys["root"]["keyid"])
roles["root"].signed.add_key(
"root", Key.from_securesystemslib_key(new_root_key)
)
roles["root"].signed.version += 1
roles["root"].signatures.clear()
for key in [keys["root"], another_root_key, new_root_key]:
roles["root"].sign(SSlibSigner(key), append=True)
roles["root"].to_file(
os.path.join(TMP_DIR, f"{roles['root'].signed.version}.root.json"),
serializer=PRETTY,
)

84
tests/test_examples.py Normal file
View file

@ -0,0 +1,84 @@
#!/usr/bin/env python
# Copyright 2020, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
""" Unit tests for 'examples' scripts.
"""
import glob
import os
import shutil
import tempfile
import unittest
from pathlib import Path
class TestRepoExamples(unittest.TestCase):
"""Unit test class for 'repo_example' scripts.
Provides a '_run_example_script' method to run (exec) a script located in
the 'repo_example' directory.
"""
@classmethod
def setUpClass(cls):
"""Locate and cache 'repo_example' dir."""
base = Path(__file__).resolve().parents[1]
cls.repo_examples_dir = base / "examples" / "repo_example"
def setUp(self):
"""Create and change into test dir.
NOTE: Test scripts are expected to create dirs/files in new CWD."""
self.original_cwd = os.getcwd()
self.base_test_dir = os.path.realpath(tempfile.mkdtemp())
os.chdir(self.base_test_dir)
def tearDown(self):
"""Change back to original dir and remove test dir, which may contain
dirs/files the test created at test-time CWD."""
os.chdir(self.original_cwd)
shutil.rmtree(self.base_test_dir)
def _run_script_and_assert_files(self, script_name, filenames_created):
"""Run script in 'repo_example' dir and assert that it created the
files corresponding to the passed filenames inside a 'tmp*' test dir at
CWD."""
script_path = str(self.repo_examples_dir / script_name)
with open(script_path, "rb") as f:
# pylint: disable=exec-used
exec(
compile(f.read(), script_path, "exec"),
{"__file__": script_path},
)
test_dirs = glob.glob("tmp*")
self.assertTrue(
len(test_dirs) == 1, f"expected 1 'tmp*' test dir, got {test_dirs}"
)
test_dir = test_dirs.pop()
for name in filenames_created:
metadata_path = Path(test_dir) / f"{name}"
self.assertTrue(
metadata_path.exists(), f"missing '{metadata_path}' file"
)
def test_basic_repo(self):
"""Run 'basic_repo.py' and assert creation of metadata files."""
self._run_script_and_assert_files(
"basic_repo.py",
[
"1.python-scripts.json",
"1.root.json",
"1.snapshot.json",
"1.targets.json",
"2.root.json",
"2.snapshot.json",
"2.targets.json",
"timestamp.json",
],
)
if __name__ == "__main__":
unittest.main()