2021-09-02 15:44:28 +00:00
|
|
|
# Copyright 2021, New York University and the TUF contributors
|
|
|
|
|
# SPDX-License-Identifier: MIT OR Apache-2.0
|
|
|
|
|
|
2024-03-07 08:05:36 +00:00
|
|
|
"""Test utility to simulate a repository
|
2021-09-02 15:44:28 +00:00
|
|
|
|
|
|
|
|
RepositorySimulator provides methods to modify repository metadata so that it's
|
|
|
|
|
easy to "publish" new repository versions with modified metadata, while serving
|
|
|
|
|
the versions to client test code.
|
|
|
|
|
|
|
|
|
|
RepositorySimulator implements FetcherInterface so Updaters in tests can use it
|
|
|
|
|
as a way to "download" new metadata from remote: in practice no downloading,
|
|
|
|
|
network connections or even file access happens as RepositorySimulator serves
|
|
|
|
|
everything from memory.
|
2021-09-10 12:54:56 +00:00
|
|
|
|
|
|
|
|
Metadata and targets "hosted" by the simulator are made available in URL paths
|
|
|
|
|
"/metadata/..." and "/targets/..." respectively.
|
|
|
|
|
|
|
|
|
|
Example::
|
|
|
|
|
|
|
|
|
|
# constructor creates repository with top-level metadata
|
|
|
|
|
sim = RepositorySimulator()
|
|
|
|
|
|
|
|
|
|
# metadata can be modified directly: it is immediately available to clients
|
|
|
|
|
sim.snapshot.version += 1
|
|
|
|
|
|
|
|
|
|
# As an exception, new root versions require explicit publishing
|
|
|
|
|
sim.root.version += 1
|
|
|
|
|
sim.publish_root()
|
|
|
|
|
|
|
|
|
|
# there are helper functions
|
|
|
|
|
sim.add_target("targets", b"content", "targetpath")
|
|
|
|
|
sim.targets.version += 1
|
|
|
|
|
sim.update_snapshot()
|
|
|
|
|
|
|
|
|
|
# Use the simulated repository from an Updater:
|
|
|
|
|
updater = Updater(
|
|
|
|
|
dir,
|
|
|
|
|
"https://example.com/metadata/",
|
2026-01-25 11:50:37 +00:00
|
|
|
dir,
|
2021-09-10 12:54:56 +00:00
|
|
|
"https://example.com/targets/",
|
2026-01-25 11:50:37 +00:00
|
|
|
sim,
|
|
|
|
|
bootstrap=sim.signed_roots[0],
|
2021-09-10 12:54:56 +00:00
|
|
|
)
|
|
|
|
|
updater.refresh()
|
2021-09-02 15:44:28 +00:00
|
|
|
"""
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2022-02-03 22:14:02 +00:00
|
|
|
import datetime
|
2025-03-18 13:49:24 +00:00
|
|
|
import hashlib
|
2021-09-03 13:53:33 +00:00
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import tempfile
|
2021-12-03 07:59:07 +00:00
|
|
|
from dataclasses import dataclass, field
|
2024-11-29 10:29:32 +00:00
|
|
|
from typing import TYPE_CHECKING
|
2021-10-12 13:54:10 +00:00
|
|
|
from urllib import parse
|
|
|
|
|
|
2024-04-24 08:36:57 +00:00
|
|
|
from securesystemslib.signer import CryptoSigner, Signer
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2022-01-31 11:56:46 +00:00
|
|
|
from tuf.api.exceptions import DownloadHTTPError
|
2021-09-10 06:30:58 +00:00
|
|
|
from tuf.api.metadata import (
|
2021-10-12 13:54:10 +00:00
|
|
|
SPECIFICATION_VERSION,
|
|
|
|
|
TOP_LEVEL_ROLE_NAMES,
|
2021-09-22 10:04:33 +00:00
|
|
|
DelegatedRole,
|
|
|
|
|
Delegations,
|
2021-09-02 15:44:28 +00:00
|
|
|
Metadata,
|
|
|
|
|
MetaFile,
|
|
|
|
|
Root,
|
|
|
|
|
Snapshot,
|
2022-06-02 17:15:59 +00:00
|
|
|
SuccinctRoles,
|
2021-09-10 12:54:56 +00:00
|
|
|
TargetFile,
|
2021-09-02 15:44:28 +00:00
|
|
|
Targets,
|
2021-09-10 06:30:58 +00:00
|
|
|
Timestamp,
|
2021-09-02 15:44:28 +00:00
|
|
|
)
|
2021-10-12 13:54:10 +00:00
|
|
|
from tuf.api.serialization.json import JSONSerializer
|
2021-09-02 15:44:28 +00:00
|
|
|
from tuf.ngclient.fetcher import FetcherInterface
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from collections.abc import Iterator
|
|
|
|
|
|
2021-09-02 15:44:28 +00:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
SPEC_VER = ".".join(SPECIFICATION_VERSION)
|
|
|
|
|
|
2025-03-19 08:28:01 +00:00
|
|
|
_HASH_ALGORITHM = "sha256"
|
2025-03-18 13:49:24 +00:00
|
|
|
|
2021-10-12 13:14:24 +00:00
|
|
|
|
2021-12-03 07:59:07 +00:00
|
|
|
@dataclass
|
|
|
|
|
class FetchTracker:
|
|
|
|
|
"""Fetcher counter for metadata and targets."""
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
metadata: list[tuple[str, int | None]] = field(default_factory=list)
|
|
|
|
|
targets: list[tuple[str, str | None]] = field(default_factory=list)
|
2021-12-03 07:59:07 +00:00
|
|
|
|
|
|
|
|
|
2021-09-10 12:54:56 +00:00
|
|
|
@dataclass
|
|
|
|
|
class RepositoryTarget:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Contains actual target data and the related target metadata."""
|
2021-10-12 13:14:24 +00:00
|
|
|
|
2021-09-10 12:54:56 +00:00
|
|
|
data: bytes
|
|
|
|
|
target_file: TargetFile
|
2021-09-10 06:30:58 +00:00
|
|
|
|
2021-10-12 13:14:24 +00:00
|
|
|
|
2021-09-02 15:44:28 +00:00
|
|
|
class RepositorySimulator(FetcherInterface):
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Simulates a repository that can be used for testing."""
|
|
|
|
|
|
2021-11-18 16:58:16 +00:00
|
|
|
def __init__(self) -> None:
|
2024-11-04 04:21:23 +00:00
|
|
|
self.md_delegates: dict[str, Metadata[Targets]] = {}
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2021-09-03 11:03:38 +00:00
|
|
|
# other metadata is signed on-demand (when fetched) but roots must be
|
|
|
|
|
# explicitly published with publish_root() which maintains this list
|
2024-11-04 04:21:23 +00:00
|
|
|
self.signed_roots: list[bytes] = []
|
2021-09-03 11:03:38 +00:00
|
|
|
|
|
|
|
|
# signers are used on-demand at fetch time to sign metadata
|
2021-10-08 07:30:31 +00:00
|
|
|
# keys are roles, values are dicts of {keyid: signer}
|
2024-11-04 04:21:23 +00:00
|
|
|
self.signers: dict[str, dict[str, Signer]] = {}
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2021-09-10 12:54:56 +00:00
|
|
|
# target downloads are served from this dict
|
2024-11-04 04:21:23 +00:00
|
|
|
self.target_files: dict[str, RepositoryTarget] = {}
|
2021-09-10 12:54:56 +00:00
|
|
|
|
2021-11-08 13:50:38 +00:00
|
|
|
# Whether to compute hashes and length for meta in snapshot/timestamp
|
2021-10-07 14:29:22 +00:00
|
|
|
self.compute_metafile_hashes_length = False
|
|
|
|
|
|
2021-11-08 13:50:38 +00:00
|
|
|
# Enable hash-prefixed target file names
|
|
|
|
|
self.prefix_targets_with_hash = True
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
self.dump_dir: str | None = None
|
2021-09-03 13:53:33 +00:00
|
|
|
self.dump_version = 0
|
|
|
|
|
|
2021-12-03 07:59:07 +00:00
|
|
|
self.fetch_tracker = FetchTracker()
|
|
|
|
|
|
2024-02-26 20:27:38 +00:00
|
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
2022-02-03 22:14:02 +00:00
|
|
|
self.safe_expiry = now.replace(microsecond=0) + datetime.timedelta(
|
|
|
|
|
days=30
|
|
|
|
|
)
|
2021-09-22 10:04:33 +00:00
|
|
|
|
2021-09-02 15:44:28 +00:00
|
|
|
self._initialize()
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def root(self) -> Root:
|
2021-09-03 11:03:38 +00:00
|
|
|
return self.md_root.signed
|
2021-09-02 15:44:28 +00:00
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def timestamp(self) -> Timestamp:
|
|
|
|
|
return self.md_timestamp.signed
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def snapshot(self) -> Snapshot:
|
|
|
|
|
return self.md_snapshot.signed
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def targets(self) -> Targets:
|
|
|
|
|
return self.md_targets.signed
|
|
|
|
|
|
2024-11-04 04:21:23 +00:00
|
|
|
def all_targets(self) -> Iterator[tuple[str, Targets]]:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Yield role name and signed portion of targets one by one."""
|
2021-11-17 12:24:03 +00:00
|
|
|
yield Targets.type, self.md_targets.signed
|
2021-09-02 15:44:28 +00:00
|
|
|
for role, md in self.md_delegates.items():
|
|
|
|
|
yield role, md.signed
|
|
|
|
|
|
2024-04-24 08:36:57 +00:00
|
|
|
def add_signer(self, role: str, signer: Signer) -> None:
|
2021-10-08 07:30:31 +00:00
|
|
|
if role not in self.signers:
|
|
|
|
|
self.signers[role] = {}
|
2024-04-24 08:36:57 +00:00
|
|
|
self.signers[role][signer.public_key.keyid] = signer
|
2021-10-08 07:30:31 +00:00
|
|
|
|
2022-01-11 17:06:32 +00:00
|
|
|
def rotate_keys(self, role: str) -> None:
|
|
|
|
|
"""remove all keys for role, then add threshold of new keys"""
|
|
|
|
|
self.root.roles[role].keyids.clear()
|
|
|
|
|
self.signers[role].clear()
|
2024-04-03 10:44:36 +00:00
|
|
|
for _ in range(self.root.roles[role].threshold):
|
2024-04-24 08:36:57 +00:00
|
|
|
signer = CryptoSigner.generate_ed25519()
|
|
|
|
|
self.root.add_key(signer.public_key, role)
|
2022-01-11 17:06:32 +00:00
|
|
|
self.add_signer(role, signer)
|
|
|
|
|
|
2021-11-18 16:58:16 +00:00
|
|
|
def _initialize(self) -> None:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Setup a minimal valid repository."""
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2022-03-23 15:32:04 +00:00
|
|
|
self.md_targets = Metadata(Targets(expires=self.safe_expiry))
|
|
|
|
|
self.md_snapshot = Metadata(Snapshot(expires=self.safe_expiry))
|
|
|
|
|
self.md_timestamp = Metadata(Timestamp(expires=self.safe_expiry))
|
|
|
|
|
self.md_root = Metadata(Root(expires=self.safe_expiry))
|
2021-10-20 12:29:02 +00:00
|
|
|
|
2021-10-25 10:20:13 +00:00
|
|
|
for role in TOP_LEVEL_ROLE_NAMES:
|
2024-04-24 08:36:57 +00:00
|
|
|
signer = CryptoSigner.generate_ed25519()
|
|
|
|
|
self.md_root.signed.add_key(signer.public_key, role)
|
2021-10-08 07:30:31 +00:00
|
|
|
self.add_signer(role, signer)
|
|
|
|
|
|
2021-09-03 11:03:38 +00:00
|
|
|
self.publish_root()
|
|
|
|
|
|
2021-11-18 16:58:16 +00:00
|
|
|
def publish_root(self) -> None:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Sign and store a new serialized version of root."""
|
2021-09-03 11:03:38 +00:00
|
|
|
self.md_root.signatures.clear()
|
2021-11-17 12:24:03 +00:00
|
|
|
for signer in self.signers[Root.type].values():
|
2021-10-08 07:30:31 +00:00
|
|
|
self.md_root.sign(signer, append=True)
|
2021-09-03 11:03:38 +00:00
|
|
|
|
|
|
|
|
self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
|
|
|
|
|
logger.debug("Published root v%d", self.root.version)
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2022-01-31 11:43:34 +00:00
|
|
|
def _fetch(self, url: str) -> Iterator[bytes]:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Fetches data from the given url and returns an Iterator (or yields
|
|
|
|
|
bytes).
|
|
|
|
|
"""
|
2021-09-22 10:04:33 +00:00
|
|
|
path = parse.urlparse(url).path
|
|
|
|
|
if path.startswith("/metadata/") and path.endswith(".json"):
|
2021-11-08 13:50:38 +00:00
|
|
|
# figure out rolename and version
|
2021-09-22 10:04:33 +00:00
|
|
|
ver_and_name = path[len("/metadata/") :][: -len(".json")]
|
2021-11-18 16:58:16 +00:00
|
|
|
version_str, _, role = ver_and_name.partition(".")
|
2021-11-08 13:50:38 +00:00
|
|
|
# root is always version-prefixed while timestamp is always NOT
|
2021-11-17 12:24:03 +00:00
|
|
|
if role == Root.type or (
|
|
|
|
|
self.root.consistent_snapshot and ver_and_name != Timestamp.type
|
2021-11-08 13:50:38 +00:00
|
|
|
):
|
2024-11-29 10:29:32 +00:00
|
|
|
version: int | None = int(version_str)
|
2021-11-08 13:50:38 +00:00
|
|
|
else:
|
|
|
|
|
# the file is not version-prefixed
|
|
|
|
|
role = ver_and_name
|
|
|
|
|
version = None
|
|
|
|
|
|
2021-12-09 13:24:40 +00:00
|
|
|
yield self.fetch_metadata(role, version)
|
2021-09-22 10:04:33 +00:00
|
|
|
elif path.startswith("/targets/"):
|
2021-09-24 08:45:48 +00:00
|
|
|
# figure out target path and hash prefix
|
2021-09-22 10:04:33 +00:00
|
|
|
target_path = path[len("/targets/") :]
|
2021-10-12 13:14:24 +00:00
|
|
|
dir_parts, sep, prefixed_filename = target_path.rpartition("/")
|
2021-11-08 13:50:38 +00:00
|
|
|
# extract the hash prefix, if any
|
2024-11-29 10:29:32 +00:00
|
|
|
prefix: str | None = None
|
2021-11-18 16:58:16 +00:00
|
|
|
filename = prefixed_filename
|
2021-11-08 13:50:38 +00:00
|
|
|
if self.root.consistent_snapshot and self.prefix_targets_with_hash:
|
|
|
|
|
prefix, _, filename = prefixed_filename.partition(".")
|
2021-09-10 12:54:56 +00:00
|
|
|
target_path = f"{dir_parts}{sep}{filename}"
|
|
|
|
|
|
2021-12-09 13:24:40 +00:00
|
|
|
yield self.fetch_target(target_path, prefix)
|
2021-09-02 15:44:28 +00:00
|
|
|
else:
|
2022-01-31 11:56:46 +00:00
|
|
|
raise DownloadHTTPError(f"Unknown path '{path}'", 404)
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
def fetch_target(self, target_path: str, target_hash: str | None) -> bytes:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Return data for 'target_path', checking 'target_hash' if it is given.
|
2021-09-24 08:45:48 +00:00
|
|
|
|
2021-10-13 16:56:53 +00:00
|
|
|
If hash is None, then consistent_snapshot is not used.
|
2021-09-24 08:45:48 +00:00
|
|
|
"""
|
2021-12-03 07:59:07 +00:00
|
|
|
self.fetch_tracker.targets.append((target_path, target_hash))
|
|
|
|
|
|
2021-09-10 12:54:56 +00:00
|
|
|
repo_target = self.target_files.get(target_path)
|
|
|
|
|
if repo_target is None:
|
2022-01-31 11:56:46 +00:00
|
|
|
raise DownloadHTTPError(f"No target {target_path}", 404)
|
2021-10-13 16:56:53 +00:00
|
|
|
if (
|
|
|
|
|
target_hash
|
|
|
|
|
and target_hash not in repo_target.target_file.hashes.values()
|
|
|
|
|
):
|
2022-01-31 11:56:46 +00:00
|
|
|
raise DownloadHTTPError(f"hash mismatch for {target_path}", 404)
|
2021-09-10 12:54:56 +00:00
|
|
|
|
|
|
|
|
logger.debug("fetched target %s", target_path)
|
|
|
|
|
return repo_target.data
|
|
|
|
|
|
2024-11-29 10:29:32 +00:00
|
|
|
def fetch_metadata(self, role: str, version: int | None = None) -> bytes:
|
2021-09-24 08:45:48 +00:00
|
|
|
"""Return signed metadata for 'role', using 'version' if it is given.
|
|
|
|
|
|
2021-10-13 16:56:53 +00:00
|
|
|
If version is None, non-versioned metadata is being requested.
|
2021-09-24 08:45:48 +00:00
|
|
|
"""
|
2021-12-03 07:59:07 +00:00
|
|
|
self.fetch_tracker.metadata.append((role, version))
|
2022-01-06 08:10:27 +00:00
|
|
|
# decode role for the metadata
|
|
|
|
|
role = parse.unquote(role, encoding="utf-8")
|
2021-12-03 07:59:07 +00:00
|
|
|
|
2021-11-17 12:24:03 +00:00
|
|
|
if role == Root.type:
|
2021-09-03 11:03:38 +00:00
|
|
|
# return a version previously serialized in publish_root()
|
2021-09-10 06:30:58 +00:00
|
|
|
if version is None or version > len(self.signed_roots):
|
2022-01-31 11:56:46 +00:00
|
|
|
raise DownloadHTTPError(f"Unknown root version {version}", 404)
|
2021-10-13 16:56:53 +00:00
|
|
|
logger.debug("fetched root version %d", version)
|
2021-09-03 11:03:38 +00:00
|
|
|
return self.signed_roots[version - 1]
|
2021-10-13 16:56:53 +00:00
|
|
|
|
|
|
|
|
# sign and serialize the requested metadata
|
2024-11-29 10:29:32 +00:00
|
|
|
md: Metadata | None
|
2021-11-17 12:24:03 +00:00
|
|
|
if role == Timestamp.type:
|
2021-11-18 16:58:16 +00:00
|
|
|
md = self.md_timestamp
|
2021-11-17 12:24:03 +00:00
|
|
|
elif role == Snapshot.type:
|
2021-10-13 16:56:53 +00:00
|
|
|
md = self.md_snapshot
|
2021-11-17 12:24:03 +00:00
|
|
|
elif role == Targets.type:
|
2021-10-13 16:56:53 +00:00
|
|
|
md = self.md_targets
|
2021-09-02 15:44:28 +00:00
|
|
|
else:
|
2021-11-08 13:50:38 +00:00
|
|
|
md = self.md_delegates.get(role)
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2021-10-13 16:56:53 +00:00
|
|
|
if md is None:
|
2022-01-31 11:56:46 +00:00
|
|
|
raise DownloadHTTPError(f"Unknown role {role}", 404)
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2021-10-13 16:56:53 +00:00
|
|
|
md.signatures.clear()
|
|
|
|
|
for signer in self.signers[role].values():
|
|
|
|
|
md.sign(signer, append=True)
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2021-10-13 16:56:53 +00:00
|
|
|
logger.debug(
|
|
|
|
|
"fetched %s v%d with %d sigs",
|
|
|
|
|
role,
|
|
|
|
|
md.signed.version,
|
|
|
|
|
len(self.signers[role]),
|
|
|
|
|
)
|
|
|
|
|
return md.to_bytes(JSONSerializer())
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2021-10-07 14:29:22 +00:00
|
|
|
def _compute_hashes_and_length(
|
|
|
|
|
self, role: str
|
2024-11-04 04:21:23 +00:00
|
|
|
) -> tuple[dict[str, str], int]:
|
2021-12-09 13:24:40 +00:00
|
|
|
data = self.fetch_metadata(role)
|
2025-03-19 08:28:01 +00:00
|
|
|
digest_object = hashlib.new(_HASH_ALGORITHM)
|
2021-10-07 14:29:22 +00:00
|
|
|
digest_object.update(data)
|
2025-03-19 08:28:01 +00:00
|
|
|
hashes = {_HASH_ALGORITHM: digest_object.hexdigest()}
|
2021-10-07 14:29:22 +00:00
|
|
|
return hashes, len(data)
|
|
|
|
|
|
2021-11-18 16:58:16 +00:00
|
|
|
def update_timestamp(self) -> None:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Update timestamp and assign snapshot version to snapshot_meta
|
|
|
|
|
version.
|
|
|
|
|
"""
|
2021-09-02 15:44:28 +00:00
|
|
|
|
2021-11-24 13:07:36 +00:00
|
|
|
hashes = None
|
|
|
|
|
length = None
|
2021-10-07 14:29:22 +00:00
|
|
|
if self.compute_metafile_hashes_length:
|
2021-11-17 12:24:03 +00:00
|
|
|
hashes, length = self._compute_hashes_and_length(Snapshot.type)
|
2021-11-24 13:07:36 +00:00
|
|
|
|
|
|
|
|
self.timestamp.snapshot_meta = MetaFile(
|
|
|
|
|
self.snapshot.version, length, hashes
|
|
|
|
|
)
|
2021-10-07 14:29:22 +00:00
|
|
|
|
2021-09-02 15:44:28 +00:00
|
|
|
self.timestamp.version += 1
|
|
|
|
|
|
2021-11-18 16:58:16 +00:00
|
|
|
def update_snapshot(self) -> None:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Update snapshot, assign targets versions and update timestamp."""
|
2021-10-08 13:46:52 +00:00
|
|
|
for role, delegate in self.all_targets():
|
2021-09-22 10:04:33 +00:00
|
|
|
hashes = None
|
|
|
|
|
length = None
|
2021-10-07 14:29:22 +00:00
|
|
|
if self.compute_metafile_hashes_length:
|
|
|
|
|
hashes, length = self._compute_hashes_and_length(role)
|
2021-09-22 10:04:33 +00:00
|
|
|
|
|
|
|
|
self.snapshot.meta[f"{role}.json"] = MetaFile(
|
|
|
|
|
delegate.version, length, hashes
|
|
|
|
|
)
|
2021-10-07 14:29:22 +00:00
|
|
|
|
2021-09-02 15:44:28 +00:00
|
|
|
self.snapshot.version += 1
|
|
|
|
|
self.update_timestamp()
|
2021-09-03 13:53:33 +00:00
|
|
|
|
2022-06-02 17:15:59 +00:00
|
|
|
def _get_delegator(self, delegator_name: str) -> Targets:
|
|
|
|
|
"""Given a delegator name return, its corresponding Targets object."""
|
|
|
|
|
if delegator_name == Targets.type:
|
|
|
|
|
return self.targets
|
|
|
|
|
|
|
|
|
|
return self.md_delegates[delegator_name].signed
|
|
|
|
|
|
2021-11-18 16:58:16 +00:00
|
|
|
def add_target(self, role: str, data: bytes, path: str) -> None:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Create a target from data and add it to the target_files."""
|
2022-06-02 17:15:59 +00:00
|
|
|
targets = self._get_delegator(role)
|
2021-09-10 12:54:56 +00:00
|
|
|
|
|
|
|
|
target = TargetFile.from_data(path, data, ["sha256"])
|
|
|
|
|
targets.targets[path] = target
|
|
|
|
|
self.target_files[path] = RepositoryTarget(data, target)
|
|
|
|
|
|
2021-09-22 10:04:33 +00:00
|
|
|
def add_delegation(
|
2021-11-22 14:28:43 +00:00
|
|
|
self, delegator_name: str, role: DelegatedRole, targets: Targets
|
2021-11-18 16:58:16 +00:00
|
|
|
) -> None:
|
2021-10-13 16:56:53 +00:00
|
|
|
"""Add delegated target role to the repository."""
|
2022-06-02 17:15:59 +00:00
|
|
|
delegator = self._get_delegator(delegator_name)
|
2021-09-22 10:04:33 +00:00
|
|
|
|
2022-06-02 17:24:57 +00:00
|
|
|
if (
|
|
|
|
|
delegator.delegations is not None
|
|
|
|
|
and delegator.delegations.succinct_roles is not None
|
|
|
|
|
):
|
|
|
|
|
raise ValueError("Can't add a role when succinct_roles is used")
|
|
|
|
|
|
2021-09-22 10:04:33 +00:00
|
|
|
# Create delegation
|
|
|
|
|
if delegator.delegations is None:
|
2022-06-02 17:24:57 +00:00
|
|
|
delegator.delegations = Delegations({}, roles={})
|
|
|
|
|
|
|
|
|
|
assert delegator.delegations.roles is not None
|
2021-09-22 10:04:33 +00:00
|
|
|
# put delegation last by default
|
|
|
|
|
delegator.delegations.roles[role.name] = role
|
|
|
|
|
|
|
|
|
|
# By default add one new key for the role
|
2024-04-24 08:36:57 +00:00
|
|
|
signer = CryptoSigner.generate_ed25519()
|
|
|
|
|
delegator.add_key(signer.public_key, role.name)
|
2021-10-08 07:30:31 +00:00
|
|
|
self.add_signer(role.name, signer)
|
2021-09-22 10:04:33 +00:00
|
|
|
|
|
|
|
|
# Add metadata for the role
|
2021-11-22 14:28:43 +00:00
|
|
|
if role.name not in self.md_delegates:
|
Remove OrderedDict in favor of python3.7+ dict
After we drop support for python3.6 we can relly that dictionaries
preserve the insertion order:
https://docs.python.org/3.7/whatsnew/3.7.html
This means we can replace the usage of OrderedDict with a standard
dictionaries.
Something we have to keep in mind is that even thought the insertion
order is preserved the equality comparison for normal dicts is
insensitive for normal dicts compared to OrderedDict
For example:
>>> OrderedDict([(1,1), (2,2)]) == OrderedDict([(2,2), (1,1)])
False
>>> dict([(1,1), (2,2)]) == dict([(2,2), (1,1)])
True
Signed-off-by: Martin Vrachev <mvrachev@vmware.com>
2022-01-19 16:19:56 +00:00
|
|
|
self.md_delegates[role.name] = Metadata(targets, {})
|
2021-09-22 10:04:33 +00:00
|
|
|
|
2022-06-02 17:15:59 +00:00
|
|
|
def add_succinct_roles(
|
|
|
|
|
self, delegator_name: str, bit_length: int, name_prefix: str
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Add succinct roles info to a delegator with name "delegator_name".
|
|
|
|
|
|
|
|
|
|
Note that for each delegated role represented by succinct roles an empty
|
|
|
|
|
Targets instance is created.
|
|
|
|
|
"""
|
|
|
|
|
delegator = self._get_delegator(delegator_name)
|
|
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
delegator.delegations is not None
|
|
|
|
|
and delegator.delegations.roles is not None
|
|
|
|
|
):
|
|
|
|
|
raise ValueError(
|
|
|
|
|
"Can't add a succinct_roles when delegated roles are used"
|
|
|
|
|
)
|
|
|
|
|
|
2024-04-24 08:36:57 +00:00
|
|
|
signer = CryptoSigner.generate_ed25519()
|
2022-06-02 17:15:59 +00:00
|
|
|
succinct_roles = SuccinctRoles([], 1, bit_length, name_prefix)
|
|
|
|
|
delegator.delegations = Delegations({}, None, succinct_roles)
|
|
|
|
|
|
|
|
|
|
# Add targets metadata for all bins.
|
|
|
|
|
for delegated_name in succinct_roles.get_roles():
|
|
|
|
|
self.md_delegates[delegated_name] = Metadata(
|
|
|
|
|
Targets(expires=self.safe_expiry)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.add_signer(delegated_name, signer)
|
|
|
|
|
|
2024-04-24 08:36:57 +00:00
|
|
|
delegator.add_key(signer.public_key)
|
2022-06-02 17:15:59 +00:00
|
|
|
|
2021-11-18 16:58:16 +00:00
|
|
|
def write(self) -> None:
|
2021-09-03 13:53:33 +00:00
|
|
|
"""Dump current repository metadata to self.dump_dir
|
|
|
|
|
|
|
|
|
|
This is a debugging tool: dumping repository state before running
|
|
|
|
|
Updater refresh may be useful while debugging a test.
|
|
|
|
|
"""
|
|
|
|
|
if self.dump_dir is None:
|
|
|
|
|
self.dump_dir = tempfile.mkdtemp()
|
|
|
|
|
print(f"Repository Simulator dumps in {self.dump_dir}")
|
|
|
|
|
|
|
|
|
|
self.dump_version += 1
|
2021-10-13 16:56:53 +00:00
|
|
|
dest_dir = os.path.join(self.dump_dir, str(self.dump_version))
|
|
|
|
|
os.makedirs(dest_dir)
|
2021-09-03 13:53:33 +00:00
|
|
|
|
|
|
|
|
for ver in range(1, len(self.signed_roots) + 1):
|
2021-10-13 16:56:53 +00:00
|
|
|
with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f:
|
2021-12-09 13:24:40 +00:00
|
|
|
f.write(self.fetch_metadata(Root.type, ver))
|
2021-09-03 13:53:33 +00:00
|
|
|
|
2021-11-17 12:24:03 +00:00
|
|
|
for role in [Timestamp.type, Snapshot.type, Targets.type]:
|
2021-10-13 16:56:53 +00:00
|
|
|
with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f:
|
2021-12-09 13:24:40 +00:00
|
|
|
f.write(self.fetch_metadata(role))
|
2021-09-03 13:53:33 +00:00
|
|
|
|
2021-10-13 16:56:53 +00:00
|
|
|
for role in self.md_delegates:
|
2021-12-01 18:03:55 +00:00
|
|
|
quoted_role = parse.quote(role, "")
|
|
|
|
|
with open(os.path.join(dest_dir, f"{quoted_role}.json"), "wb") as f:
|
2021-12-09 13:24:40 +00:00
|
|
|
f.write(self.fetch_metadata(role))
|