mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #2731 from NicholasTanz/updateAnnotations
update python annotations
This commit is contained in:
commit
f2aeb97add
28 changed files with 187 additions and 186 deletions
|
|
@ -25,7 +25,6 @@
|
|||
import tempfile
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
from securesystemslib.signer import CryptoSigner, Signer
|
||||
|
||||
|
|
@ -87,8 +86,8 @@ def _in(days: float) -> datetime:
|
|||
|
||||
# Define containers for role objects and cryptographic keys created below. This
|
||||
# allows us to sign and write metadata in a batch more easily.
|
||||
roles: Dict[str, Metadata] = {}
|
||||
signers: Dict[str, Signer] = {}
|
||||
roles: dict[str, Metadata] = {}
|
||||
signers: dict[str, Signer] = {}
|
||||
|
||||
|
||||
# Targets (integrity)
|
||||
|
|
|
|||
|
|
@ -19,9 +19,9 @@
|
|||
import hashlib
|
||||
import os
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterator, List, Tuple
|
||||
|
||||
from securesystemslib.signer import CryptoSigner, Signer
|
||||
|
||||
|
|
@ -42,8 +42,8 @@ def _in(days: float) -> datetime:
|
|||
)
|
||||
|
||||
|
||||
roles: Dict[str, Metadata[Targets]] = {}
|
||||
signers: Dict[str, Signer] = {}
|
||||
roles: dict[str, Metadata[Targets]] = {}
|
||||
signers: dict[str, Signer] = {}
|
||||
|
||||
# Hash bin delegation
|
||||
# ===================
|
||||
|
|
@ -96,7 +96,7 @@ def _bin_name(low: int, high: int) -> str:
|
|||
return f"{low:0{PREFIX_LEN}x}-{high:0{PREFIX_LEN}x}"
|
||||
|
||||
|
||||
def generate_hash_bins() -> Iterator[Tuple[str, List[str]]]:
|
||||
def generate_hash_bins() -> Iterator[tuple[str, list[str]]]:
|
||||
"""Returns generator for bin names and hash prefixes per bin."""
|
||||
# Iterate over the total number of hash prefixes in 'bin size'-steps to
|
||||
# generate bin names and a list of hash prefixes served by each bin.
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@
|
|||
import tempfile
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
from securesystemslib.signer import CryptoSigner
|
||||
|
||||
|
|
@ -105,7 +104,7 @@
|
|||
bit_length=BIT_LENGTH,
|
||||
name_prefix=NAME_PREFIX,
|
||||
)
|
||||
delegations_keys_info: Dict[str, Key] = {}
|
||||
delegations_keys_info: dict[str, Key] = {}
|
||||
delegations_keys_info[bins_key.keyid] = bins_key
|
||||
|
||||
targets.signed.delegations = Delegations(
|
||||
|
|
@ -119,7 +118,7 @@
|
|||
|
||||
assert targets.signed.delegations.succinct_roles is not None # make mypy happy
|
||||
|
||||
delegated_bins: Dict[str, Metadata[Targets]] = {}
|
||||
delegated_bins: dict[str, Metadata[Targets]] = {}
|
||||
for delegated_bin_name in targets.signed.delegations.succinct_roles.get_roles():
|
||||
delegated_bins[delegated_bin_name] = Metadata(
|
||||
Targets(expires=expiration_date)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
import logging
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Dict, List, Union
|
||||
from typing import Union
|
||||
|
||||
from securesystemslib.signer import CryptoSigner, Key, Signer
|
||||
|
||||
|
|
@ -59,16 +59,16 @@ class SimpleRepository(Repository):
|
|||
|
||||
def __init__(self) -> None:
|
||||
# all versions of all metadata
|
||||
self.role_cache: Dict[str, List[Metadata]] = defaultdict(list)
|
||||
self.role_cache: dict[str, list[Metadata]] = defaultdict(list)
|
||||
# all current keys
|
||||
self.signer_cache: Dict[str, List[Signer]] = defaultdict(list)
|
||||
self.signer_cache: dict[str, list[Signer]] = defaultdict(list)
|
||||
# all target content
|
||||
self.target_cache: Dict[str, bytes] = {}
|
||||
self.target_cache: dict[str, bytes] = {}
|
||||
# version cache for snapshot and all targets, updated in close().
|
||||
# The 'defaultdict(lambda: ...)' trick allows close() to easily modify
|
||||
# the version without always creating a new MetaFile
|
||||
self._snapshot_info = MetaFile(1)
|
||||
self._targets_infos: Dict[str, MetaFile] = defaultdict(
|
||||
self._targets_infos: dict[str, MetaFile] = defaultdict(
|
||||
lambda: MetaFile(1)
|
||||
)
|
||||
|
||||
|
|
@ -84,7 +84,7 @@ def __init__(self) -> None:
|
|||
pass
|
||||
|
||||
@property
|
||||
def targets_infos(self) -> Dict[str, MetaFile]:
|
||||
def targets_infos(self) -> dict[str, MetaFile]:
|
||||
return self._targets_infos
|
||||
|
||||
@property
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@
|
|||
import logging
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Dict
|
||||
|
||||
import requests
|
||||
from securesystemslib.signer import CryptoSigner, Signer
|
||||
|
|
@ -50,7 +49,7 @@ def __init__(self, metadata_dir: str, key_dir: str, base_url: str):
|
|||
self.updater.refresh()
|
||||
|
||||
@property
|
||||
def targets_infos(self) -> Dict[str, MetaFile]:
|
||||
def targets_infos(self) -> dict[str, MetaFile]:
|
||||
raise NotImplementedError # we never call snapshot
|
||||
|
||||
@property
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ name = "tuf"
|
|||
description = "A secure updater framework for Python"
|
||||
readme = "README.md"
|
||||
license = { text = "MIT OR Apache-2.0" }
|
||||
requires-python = ">=3.8"
|
||||
requires-python = ">=3.9"
|
||||
authors = [
|
||||
{ email = "theupdateframework@googlegroups.com" },
|
||||
]
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
||||
from securesystemslib.signer import CryptoSigner, Signer, SSlibKey
|
||||
|
|
@ -16,13 +16,13 @@
|
|||
from tuf.api.serialization.json import JSONSerializer
|
||||
|
||||
# Hardcode keys and expiry time to achieve reproducibility.
|
||||
public_values: List[str] = [
|
||||
public_values: list[str] = [
|
||||
"b11d2ff132c033a657318c74c39526476c56de7556c776f11070842dbc4ac14c",
|
||||
"250f9ae3d1d3d5c419a73cfb4a470c01de1d5d3d61a3825416b5f5d6b88f4a30",
|
||||
"82380623abb9666d4bf274b1a02577469445a972e5650d270101faa5107b19c8",
|
||||
"0e6738fc1ac6fb4de680b4be99ecbcd99b030f3963f291277eef67bb9bd123e9",
|
||||
]
|
||||
private_values: List[bytes] = [
|
||||
private_values: list[bytes] = [
|
||||
bytes.fromhex(
|
||||
"510e5e04d7a364af850533856eacdf65d30cc0f8803ecd5fdc0acc56ca2aa91c"
|
||||
),
|
||||
|
|
@ -36,14 +36,14 @@
|
|||
"7e2e751145d1b22f6e40d4ba2aa47158207acfd3c003f1cbd5a08141dfc22a15"
|
||||
),
|
||||
]
|
||||
keyids: List[str] = [
|
||||
keyids: list[str] = [
|
||||
"5822582e7072996c1eef1cec24b61115d364987faa486659fe3d3dce8dae2aba",
|
||||
"09d440e3725cec247dcb8703b646a87dd2a4d75343e8095c036c32795eefe3b9",
|
||||
"3458204ed467519c19a5316eb278b5608472a1bbf15850ebfb462d5315e4f86d",
|
||||
"2be5c21e3614f9f178fb49c4a34d0c18ffac30abd14ced917c60a52c8d8094b7",
|
||||
]
|
||||
|
||||
signers: List[Signer] = []
|
||||
signers: list[Signer] = []
|
||||
for index in range(len(keyids)):
|
||||
key = SSlibKey(
|
||||
keyids[index],
|
||||
|
|
|
|||
|
|
@ -46,8 +46,9 @@
|
|||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, Iterator, List, Optional, Tuple
|
||||
from typing import Optional
|
||||
from urllib import parse
|
||||
|
||||
import securesystemslib.hash as sslib_hash
|
||||
|
|
@ -80,8 +81,8 @@
|
|||
class FetchTracker:
|
||||
"""Fetcher counter for metadata and targets."""
|
||||
|
||||
metadata: List[Tuple[str, Optional[int]]] = field(default_factory=list)
|
||||
targets: List[Tuple[str, Optional[str]]] = field(default_factory=list)
|
||||
metadata: list[tuple[str, Optional[int]]] = field(default_factory=list)
|
||||
targets: list[tuple[str, Optional[str]]] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -96,18 +97,18 @@ class RepositorySimulator(FetcherInterface):
|
|||
"""Simulates a repository that can be used for testing."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.md_delegates: Dict[str, Metadata[Targets]] = {}
|
||||
self.md_delegates: dict[str, Metadata[Targets]] = {}
|
||||
|
||||
# other metadata is signed on-demand (when fetched) but roots must be
|
||||
# explicitly published with publish_root() which maintains this list
|
||||
self.signed_roots: List[bytes] = []
|
||||
self.signed_roots: list[bytes] = []
|
||||
|
||||
# signers are used on-demand at fetch time to sign metadata
|
||||
# keys are roles, values are dicts of {keyid: signer}
|
||||
self.signers: Dict[str, Dict[str, Signer]] = {}
|
||||
self.signers: dict[str, dict[str, Signer]] = {}
|
||||
|
||||
# target downloads are served from this dict
|
||||
self.target_files: Dict[str, RepositoryTarget] = {}
|
||||
self.target_files: dict[str, RepositoryTarget] = {}
|
||||
|
||||
# Whether to compute hashes and length for meta in snapshot/timestamp
|
||||
self.compute_metafile_hashes_length = False
|
||||
|
|
@ -143,7 +144,7 @@ def snapshot(self) -> Snapshot:
|
|||
def targets(self) -> Targets:
|
||||
return self.md_targets.signed
|
||||
|
||||
def all_targets(self) -> Iterator[Tuple[str, Targets]]:
|
||||
def all_targets(self) -> Iterator[tuple[str, Targets]]:
|
||||
"""Yield role name and signed portion of targets one by one."""
|
||||
yield Targets.type, self.md_targets.signed
|
||||
for role, md in self.md_delegates.items():
|
||||
|
|
@ -287,7 +288,7 @@ def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes:
|
|||
|
||||
def _compute_hashes_and_length(
|
||||
self, role: str
|
||||
) -> Tuple[Dict[str, str], int]:
|
||||
) -> tuple[dict[str, str], int]:
|
||||
data = self.fetch_metadata(role)
|
||||
digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM)
|
||||
digest_object.update(data)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
from copy import copy, deepcopy
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import ClassVar, Dict, Optional
|
||||
from typing import ClassVar, Optional
|
||||
|
||||
from securesystemslib import exceptions as sslib_exceptions
|
||||
from securesystemslib import hash as sslib_hash
|
||||
|
|
@ -54,7 +54,7 @@ class TestMetadata(unittest.TestCase):
|
|||
temporary_directory: ClassVar[str]
|
||||
repo_dir: ClassVar[str]
|
||||
keystore_dir: ClassVar[str]
|
||||
signers: ClassVar[Dict[str, Signer]]
|
||||
signers: ClassVar[dict[str, Signer]]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
|
|
@ -763,7 +763,7 @@ def test_targets_key_api(self) -> None:
|
|||
}
|
||||
)
|
||||
assert isinstance(targets.delegations, Delegations)
|
||||
assert isinstance(targets.delegations.roles, Dict)
|
||||
assert isinstance(targets.delegations.roles, dict)
|
||||
targets.delegations.roles["role2"] = delegated_role
|
||||
|
||||
key_dict = {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@
|
|||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from typing import ClassVar, List
|
||||
from typing import ClassVar
|
||||
|
||||
from tests import utils
|
||||
|
||||
|
|
@ -44,7 +44,7 @@ def tearDown(self) -> None:
|
|||
shutil.rmtree(self.base_test_dir)
|
||||
|
||||
def _run_script_and_assert_files(
|
||||
self, script_name: str, filenames_created: List[str]
|
||||
self, script_name: str, filenames_created: list[str]
|
||||
) -> None:
|
||||
"""Run script in exmple dir and assert that it created the
|
||||
files corresponding to the passed filenames inside a 'tmp*' test dir at
|
||||
|
|
|
|||
|
|
@ -10,7 +10,8 @@
|
|||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from typing import Any, ClassVar, Iterator
|
||||
from collections.abc import Iterator
|
||||
from typing import Any, ClassVar
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import requests
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from typing import Any, ClassVar, Dict
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from securesystemslib.signer import SSlibKey
|
||||
|
||||
|
|
@ -28,7 +28,7 @@
|
|||
class TestMetadataComparisions(unittest.TestCase):
|
||||
"""Test __eq__ for all classes inside tuf/api/metadata.py."""
|
||||
|
||||
metadata: ClassVar[Dict[str, bytes]]
|
||||
metadata: ClassVar[dict[str, bytes]]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
|
|
@ -85,7 +85,7 @@ def setUpClass(cls) -> None:
|
|||
}
|
||||
|
||||
@utils.run_sub_tests_with_dataset(classes_attributes_modifications)
|
||||
def test_classes_eq_(self, test_case_data: Dict[str, Any]) -> None:
|
||||
def test_classes_eq_(self, test_case_data: dict[str, Any]) -> None:
|
||||
obj = self.objects[self.case_name]
|
||||
|
||||
# Assert that obj is not equal to an object from another type
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@
|
|||
import unittest
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Dict, List
|
||||
|
||||
from securesystemslib.signer import CryptoSigner, Signer
|
||||
|
||||
|
|
@ -57,14 +56,14 @@ class TestingRepository(Repository):
|
|||
|
||||
def __init__(self) -> None:
|
||||
# all versions of all metadata
|
||||
self.role_cache: Dict[str, List[Metadata]] = defaultdict(list)
|
||||
self.role_cache: dict[str, list[Metadata]] = defaultdict(list)
|
||||
# all current keys
|
||||
self.signer_cache: Dict[str, List[Signer]] = defaultdict(list)
|
||||
self.signer_cache: dict[str, list[Signer]] = defaultdict(list)
|
||||
# version cache for snapshot and all targets, updated in close().
|
||||
# The 'defaultdict(lambda: ...)' trick allows close() to easily modify
|
||||
# the version without always creating a new MetaFile
|
||||
self._snapshot_info = MetaFile(1)
|
||||
self._targets_infos: Dict[str, MetaFile] = defaultdict(
|
||||
self._targets_infos: dict[str, MetaFile] = defaultdict(
|
||||
lambda: MetaFile(1)
|
||||
)
|
||||
|
||||
|
|
@ -80,7 +79,7 @@ def __init__(self) -> None:
|
|||
pass
|
||||
|
||||
@property
|
||||
def targets_infos(self) -> Dict[str, MetaFile]:
|
||||
def targets_infos(self) -> dict[str, MetaFile]:
|
||||
return self._targets_infos
|
||||
|
||||
@property
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
import sys
|
||||
import unittest
|
||||
from datetime import datetime, timezone
|
||||
from typing import Callable, ClassVar, Dict, List, Optional, Tuple
|
||||
from typing import Callable, ClassVar, Optional
|
||||
|
||||
from securesystemslib.signer import Signer
|
||||
|
||||
|
|
@ -34,8 +34,8 @@
|
|||
class TestTrustedMetadataSet(unittest.TestCase):
|
||||
"""Tests for all public API of the TrustedMetadataSet class."""
|
||||
|
||||
keystore: ClassVar[Dict[str, Signer]]
|
||||
metadata: ClassVar[Dict[str, bytes]]
|
||||
keystore: ClassVar[dict[str, Signer]]
|
||||
metadata: ClassVar[dict[str, bytes]]
|
||||
repo_dir: ClassVar[str]
|
||||
|
||||
@classmethod
|
||||
|
|
@ -232,7 +232,7 @@ def test_bad_root_update(self) -> None:
|
|||
self.trusted_set.update_root(self.metadata[Snapshot.type])
|
||||
|
||||
def test_top_level_md_with_invalid_json(self) -> None:
|
||||
top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [
|
||||
top_level_md: list[tuple[bytes, Callable[[bytes], Signed]]] = [
|
||||
(self.metadata[Timestamp.type], self.trusted_set.update_timestamp),
|
||||
(self.metadata[Snapshot.type], self.trusted_set.update_snapshot),
|
||||
(self.metadata[Targets.type], self.trusted_set.update_targets),
|
||||
|
|
|
|||
|
|
@ -7,7 +7,8 @@
|
|||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
from collections.abc import Iterable
|
||||
from typing import Any, Optional
|
||||
|
||||
from tests import utils
|
||||
from tests.repository_simulator import RepositorySimulator
|
||||
|
|
@ -120,13 +121,13 @@ def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None:
|
|||
|
||||
@utils.run_sub_tests_with_dataset(top_level_roles_data)
|
||||
def test_top_level_roles_update(
|
||||
self, test_case_data: Dict[str, Any]
|
||||
self, test_case_data: dict[str, Any]
|
||||
) -> None:
|
||||
# Test if the client fetches and stores metadata files with the
|
||||
# correct version prefix, depending on 'consistent_snapshot' config
|
||||
try:
|
||||
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
||||
exp_calls: List[Any] = test_case_data["calls"]
|
||||
exp_calls: list[Any] = test_case_data["calls"]
|
||||
|
||||
self.setup_subtest(consistent_snapshot)
|
||||
updater = self._init_updater()
|
||||
|
|
@ -155,7 +156,7 @@ def test_top_level_roles_update(
|
|||
|
||||
@utils.run_sub_tests_with_dataset(delegated_roles_data)
|
||||
def test_delegated_roles_update(
|
||||
self, test_case_data: Dict[str, Any]
|
||||
self, test_case_data: dict[str, Any]
|
||||
) -> None:
|
||||
# Test if the client fetches and stores delegated metadata files with
|
||||
# the correct version prefix, depending on 'consistent_snapshot' config
|
||||
|
|
@ -211,7 +212,7 @@ def test_delegated_roles_update(
|
|||
}
|
||||
|
||||
@utils.run_sub_tests_with_dataset(targets_download_data)
|
||||
def test_download_targets(self, test_case_data: Dict[str, Any]) -> None:
|
||||
def test_download_targets(self, test_case_data: dict[str, Any]) -> None:
|
||||
# Test if the client fetches and stores target files with
|
||||
# the correct hash prefix, depending on 'consistent_snapshot'
|
||||
# and 'prefix_targets_with_hash' config
|
||||
|
|
@ -219,7 +220,7 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None:
|
|||
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
|
||||
prefix_targets_with_hash: bool = test_case_data["prefix_targets"]
|
||||
hash_algo: Optional[str] = test_case_data["hash_algo"]
|
||||
targetpaths: List[str] = test_case_data["targetpaths"]
|
||||
targetpaths: list[str] = test_case_data["targetpaths"]
|
||||
|
||||
self.setup_subtest(consistent_snapshot, prefix_targets_with_hash)
|
||||
# Add targets to repository
|
||||
|
|
|
|||
|
|
@ -8,8 +8,9 @@
|
|||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from collections.abc import Iterable
|
||||
from dataclasses import astuple, dataclass, field
|
||||
from typing import Iterable, List, Optional
|
||||
from typing import Optional
|
||||
|
||||
from tests import utils
|
||||
from tests.repository_simulator import RepositorySimulator
|
||||
|
|
@ -27,11 +28,11 @@
|
|||
class TestDelegation:
|
||||
delegator: str
|
||||
rolename: str
|
||||
keyids: List[str] = field(default_factory=list)
|
||||
keyids: list[str] = field(default_factory=list)
|
||||
threshold: int = 1
|
||||
terminating: bool = False
|
||||
paths: Optional[List[str]] = field(default_factory=lambda: ["*"])
|
||||
path_hash_prefixes: Optional[List[str]] = None
|
||||
paths: Optional[list[str]] = field(default_factory=lambda: ["*"])
|
||||
path_hash_prefixes: Optional[list[str]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -46,16 +47,16 @@ class DelegationsTestCase:
|
|||
"""A delegations graph as lists of delegations and target files
|
||||
and the expected order of traversal as a list of role names."""
|
||||
|
||||
delegations: List[TestDelegation]
|
||||
target_files: List[TestTarget] = field(default_factory=list)
|
||||
visited_order: List[str] = field(default_factory=list)
|
||||
delegations: list[TestDelegation]
|
||||
target_files: list[TestTarget] = field(default_factory=list)
|
||||
visited_order: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TargetTestCase:
|
||||
targetpath: str
|
||||
found: bool
|
||||
visited_order: List[str] = field(default_factory=list)
|
||||
visited_order: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class TestDelegations(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
import tempfile
|
||||
import unittest
|
||||
from dataclasses import dataclass
|
||||
from typing import ClassVar, Dict, List, Optional, Type
|
||||
from typing import ClassVar, Optional
|
||||
|
||||
from securesystemslib.signer import CryptoSigner, Signer
|
||||
|
||||
|
|
@ -22,10 +22,10 @@
|
|||
|
||||
@dataclass
|
||||
class MdVersion:
|
||||
keys: List[int]
|
||||
keys: list[int]
|
||||
threshold: int
|
||||
sigs: List[int]
|
||||
res: Optional[Type[Exception]] = None
|
||||
sigs: list[int]
|
||||
res: Optional[type[Exception]] = None
|
||||
|
||||
|
||||
class TestUpdaterKeyRotations(unittest.TestCase):
|
||||
|
|
@ -34,8 +34,8 @@ class TestUpdaterKeyRotations(unittest.TestCase):
|
|||
# set dump_dir to trigger repository state dumps
|
||||
dump_dir: Optional[str] = None
|
||||
temp_dir: ClassVar[tempfile.TemporaryDirectory]
|
||||
keys: ClassVar[List[Key]]
|
||||
signers: ClassVar[List[Signer]]
|
||||
keys: ClassVar[list[Key]]
|
||||
signers: ClassVar[list[Signer]]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
|
|
@ -153,7 +153,7 @@ def _run_refresh(self) -> None:
|
|||
# fmt: on
|
||||
|
||||
@run_sub_tests_with_dataset(root_rotation_cases)
|
||||
def test_root_rotation(self, root_versions: List[MdVersion]) -> None:
|
||||
def test_root_rotation(self, root_versions: list[MdVersion]) -> None:
|
||||
"""Test Updater.refresh() with various sequences of root updates
|
||||
|
||||
Each MdVersion in the list describes root keys and signatures of a
|
||||
|
|
@ -198,7 +198,7 @@ def test_root_rotation(self, root_versions: List[MdVersion]) -> None:
|
|||
self.assertEqual(f.read(), expected_local_root)
|
||||
|
||||
# fmt: off
|
||||
non_root_rotation_cases: Dict[str, MdVersion] = {
|
||||
non_root_rotation_cases: dict[str, MdVersion] = {
|
||||
"1-of-1 key rotation":
|
||||
MdVersion(keys=[2], threshold=1, sigs=[2]),
|
||||
"1-of-1 key rotation, unused signatures":
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@
|
|||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from typing import Callable, ClassVar, List
|
||||
from typing import Callable, ClassVar
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from securesystemslib.signer import Signer
|
||||
|
|
@ -147,7 +147,7 @@ def _modify_repository_root(
|
|||
)
|
||||
)
|
||||
|
||||
def _assert_files(self, roles: List[str]) -> None:
|
||||
def _assert_files(self, roles: list[str]) -> None:
|
||||
"""Assert that local metadata files exist for 'roles'"""
|
||||
expected_files = [f"{role}.json" for role in roles]
|
||||
client_files = sorted(os.listdir(self.client_directory))
|
||||
|
|
|
|||
|
|
@ -9,8 +9,9 @@
|
|||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from collections.abc import Iterable
|
||||
from datetime import timezone
|
||||
from typing import Iterable, Optional
|
||||
from typing import Optional
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
import freezegun
|
||||
|
|
|
|||
|
|
@ -30,8 +30,9 @@
|
|||
import time
|
||||
import unittest
|
||||
import warnings
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from typing import IO, Any, Callable, Dict, Iterator, List, Optional
|
||||
from typing import IO, Any, Callable, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -42,7 +43,7 @@
|
|||
TEST_HOST_ADDRESS = "127.0.0.1"
|
||||
|
||||
# DataSet is only here so type hints can be used.
|
||||
DataSet = Dict[str, Any]
|
||||
DataSet = dict[str, Any]
|
||||
|
||||
|
||||
# Test runner decorator: Runs the test as a set of N SubTests,
|
||||
|
|
@ -131,7 +132,7 @@ def wait_for_server(
|
|||
)
|
||||
|
||||
|
||||
def configure_test_logging(argv: List[str]) -> None:
|
||||
def configure_test_logging(argv: list[str]) -> None:
|
||||
"""Configure logger level for a certain test file"""
|
||||
# parse arguments but only handle '-v': argv may contain
|
||||
# other things meant for unittest argument parser
|
||||
|
|
@ -184,12 +185,12 @@ def __init__(
|
|||
server: str = os.path.join(TESTS_DIR, "simple_server.py"),
|
||||
timeout: int = 10,
|
||||
popen_cwd: str = ".",
|
||||
extra_cmd_args: Optional[List[str]] = None,
|
||||
extra_cmd_args: Optional[list[str]] = None,
|
||||
):
|
||||
self.server = server
|
||||
self.__logger = log
|
||||
# Stores popped messages from the queue.
|
||||
self.__logged_messages: List[str] = []
|
||||
self.__logged_messages: list[str] = []
|
||||
self.__server_process: Optional[subprocess.Popen] = None
|
||||
self._log_queue: Optional[queue.Queue] = None
|
||||
self.port = -1
|
||||
|
|
@ -205,7 +206,7 @@ def __init__(
|
|||
raise e
|
||||
|
||||
def _start_server(
|
||||
self, timeout: int, extra_cmd_args: List[str], popen_cwd: str
|
||||
self, timeout: int, extra_cmd_args: list[str], popen_cwd: str
|
||||
) -> None:
|
||||
"""
|
||||
Start the server subprocess and a thread
|
||||
|
|
@ -220,7 +221,7 @@ def _start_server(
|
|||
|
||||
self.__logger.info("%s serving on %d", self.server, self.port)
|
||||
|
||||
def _start_process(self, extra_cmd_args: List[str], popen_cwd: str) -> None:
|
||||
def _start_process(self, extra_cmd_args: list[str], popen_cwd: str) -> None:
|
||||
"""Starts the process running the server."""
|
||||
|
||||
# The "-u" option forces stdin, stdout and stderr to be unbuffered.
|
||||
|
|
|
|||
|
|
@ -8,17 +8,14 @@
|
|||
import fnmatch
|
||||
import io
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import (
|
||||
IO,
|
||||
Any,
|
||||
ClassVar,
|
||||
Dict,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
|
@ -103,7 +100,7 @@ def __init__(
|
|||
version: Optional[int],
|
||||
spec_version: Optional[str],
|
||||
expires: Optional[datetime],
|
||||
unrecognized_fields: Optional[Dict[str, Any]],
|
||||
unrecognized_fields: Optional[dict[str, Any]],
|
||||
):
|
||||
if spec_version is None:
|
||||
spec_version = ".".join(SPECIFICATION_VERSION)
|
||||
|
|
@ -146,13 +143,13 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Serialize and return a dict representation of self."""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Signed":
|
||||
def from_dict(cls, signed_dict: dict[str, Any]) -> "Signed":
|
||||
"""Deserialization helper, creates object from json/dict
|
||||
representation.
|
||||
"""
|
||||
|
|
@ -160,8 +157,8 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Signed":
|
|||
|
||||
@classmethod
|
||||
def _common_fields_from_dict(
|
||||
cls, signed_dict: Dict[str, Any]
|
||||
) -> Tuple[int, str, datetime]:
|
||||
cls, signed_dict: dict[str, Any]
|
||||
) -> tuple[int, str, datetime]:
|
||||
"""Return common fields of ``Signed`` instances from the passed dict
|
||||
representation, and returns an ordered list to be passed as leading
|
||||
positional arguments to a subclass constructor.
|
||||
|
|
@ -186,7 +183,7 @@ def _common_fields_from_dict(
|
|||
|
||||
return version, spec_version, expires
|
||||
|
||||
def _common_fields_to_dict(self) -> Dict[str, Any]:
|
||||
def _common_fields_to_dict(self) -> dict[str, Any]:
|
||||
"""Return a dict representation of common fields of
|
||||
``Signed`` instances.
|
||||
|
||||
|
|
@ -238,9 +235,9 @@ class Role:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
keyids: List[str],
|
||||
keyids: list[str],
|
||||
threshold: int,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
if len(set(keyids)) != len(keyids):
|
||||
raise ValueError(f"Nonunique keyids: {keyids}")
|
||||
|
|
@ -264,7 +261,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, role_dict: Dict[str, Any]) -> "Role":
|
||||
def from_dict(cls, role_dict: dict[str, Any]) -> "Role":
|
||||
"""Create ``Role`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -275,7 +272,7 @@ def from_dict(cls, role_dict: Dict[str, Any]) -> "Role":
|
|||
# All fields left in the role_dict are unrecognized.
|
||||
return cls(keyids, threshold, role_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dictionary representation of self."""
|
||||
return {
|
||||
"keyids": self.keyids,
|
||||
|
|
@ -295,8 +292,8 @@ class VerificationResult:
|
|||
"""
|
||||
|
||||
threshold: int
|
||||
signed: Dict[str, Key]
|
||||
unsigned: Dict[str, Key]
|
||||
signed: dict[str, Key]
|
||||
unsigned: dict[str, Key]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return self.verified
|
||||
|
|
@ -343,22 +340,20 @@ def verified(self) -> bool:
|
|||
return self.first.verified and self.second.verified
|
||||
|
||||
@property
|
||||
def signed(self) -> Dict[str, Key]:
|
||||
def signed(self) -> dict[str, Key]:
|
||||
"""Dictionary of all signing keys that have signed, from both
|
||||
VerificationResults.
|
||||
return a union of all signed (in python<3.9 this requires
|
||||
dict unpacking)
|
||||
return a union of all signed.
|
||||
"""
|
||||
return {**self.first.signed, **self.second.signed}
|
||||
return self.first.signed | self.second.signed
|
||||
|
||||
@property
|
||||
def unsigned(self) -> Dict[str, Key]:
|
||||
def unsigned(self) -> dict[str, Key]:
|
||||
"""Dictionary of all signing keys that have not signed, from both
|
||||
VerificationResults.
|
||||
return a union of all unsigned (in python<3.9 this requires
|
||||
dict unpacking)
|
||||
return a union of all unsigned.
|
||||
"""
|
||||
return {**self.first.unsigned, **self.second.unsigned}
|
||||
return self.first.unsigned | self.second.unsigned
|
||||
|
||||
|
||||
class _DelegatorMixin(metaclass=abc.ABCMeta):
|
||||
|
|
@ -384,7 +379,7 @@ def get_verification_result(
|
|||
self,
|
||||
delegated_role: str,
|
||||
payload: bytes,
|
||||
signatures: Dict[str, Signature],
|
||||
signatures: dict[str, Signature],
|
||||
) -> VerificationResult:
|
||||
"""Return signature threshold verification result for delegated role.
|
||||
|
||||
|
|
@ -430,7 +425,7 @@ def verify_delegate(
|
|||
self,
|
||||
delegated_role: str,
|
||||
payload: bytes,
|
||||
signatures: Dict[str, Signature],
|
||||
signatures: dict[str, Signature],
|
||||
) -> None:
|
||||
"""Verify signature threshold for delegated role.
|
||||
|
||||
|
|
@ -489,10 +484,10 @@ def __init__(
|
|||
version: Optional[int] = None,
|
||||
spec_version: Optional[str] = None,
|
||||
expires: Optional[datetime] = None,
|
||||
keys: Optional[Dict[str, Key]] = None,
|
||||
roles: Optional[Dict[str, Role]] = None,
|
||||
keys: Optional[dict[str, Key]] = None,
|
||||
roles: Optional[dict[str, Role]] = None,
|
||||
consistent_snapshot: Optional[bool] = True,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
super().__init__(version, spec_version, expires, unrecognized_fields)
|
||||
self.consistent_snapshot = consistent_snapshot
|
||||
|
|
@ -516,7 +511,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root":
|
||||
def from_dict(cls, signed_dict: dict[str, Any]) -> "Root":
|
||||
"""Create ``Root`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -535,7 +530,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root":
|
|||
# All fields left in the signed_dict are unrecognized.
|
||||
return cls(*common_args, keys, roles, consistent_snapshot, signed_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
root_dict = self._common_fields_to_dict()
|
||||
keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()}
|
||||
|
|
@ -616,7 +611,7 @@ def get_root_verification_result(
|
|||
self,
|
||||
previous: Optional["Root"],
|
||||
payload: bytes,
|
||||
signatures: Dict[str, Signature],
|
||||
signatures: dict[str, Signature],
|
||||
) -> RootVerificationResult:
|
||||
"""Return signature threshold verification result for two root roles.
|
||||
|
||||
|
|
@ -661,7 +656,7 @@ class BaseFile:
|
|||
|
||||
@staticmethod
|
||||
def _verify_hashes(
|
||||
data: Union[bytes, IO[bytes]], expected_hashes: Dict[str, str]
|
||||
data: Union[bytes, IO[bytes]], expected_hashes: dict[str, str]
|
||||
) -> None:
|
||||
"""Verify that the hash of ``data`` matches ``expected_hashes``."""
|
||||
is_bytes = isinstance(data, bytes)
|
||||
|
|
@ -707,7 +702,7 @@ def _verify_length(
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def _validate_hashes(hashes: Dict[str, str]) -> None:
|
||||
def _validate_hashes(hashes: dict[str, str]) -> None:
|
||||
if not hashes:
|
||||
raise ValueError("Hashes must be a non empty dictionary")
|
||||
for key, value in hashes.items():
|
||||
|
|
@ -721,8 +716,8 @@ def _validate_length(length: int) -> None:
|
|||
|
||||
@staticmethod
|
||||
def _get_length_and_hashes(
|
||||
data: Union[bytes, IO[bytes]], hash_algorithms: Optional[List[str]]
|
||||
) -> Tuple[int, Dict[str, str]]:
|
||||
data: Union[bytes, IO[bytes]], hash_algorithms: Optional[list[str]]
|
||||
) -> tuple[int, dict[str, str]]:
|
||||
"""Calculate length and hashes of ``data``."""
|
||||
if isinstance(data, bytes):
|
||||
length = len(data)
|
||||
|
|
@ -777,8 +772,8 @@ def __init__(
|
|||
self,
|
||||
version: int = 1,
|
||||
length: Optional[int] = None,
|
||||
hashes: Optional[Dict[str, str]] = None,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
hashes: Optional[dict[str, str]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
if version <= 0:
|
||||
raise ValueError(f"Metafile version must be > 0, got {version}")
|
||||
|
|
@ -807,7 +802,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile":
|
||||
def from_dict(cls, meta_dict: dict[str, Any]) -> "MetaFile":
|
||||
"""Create ``MetaFile`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -825,7 +820,7 @@ def from_data(
|
|||
cls,
|
||||
version: int,
|
||||
data: Union[bytes, IO[bytes]],
|
||||
hash_algorithms: List[str],
|
||||
hash_algorithms: list[str],
|
||||
) -> "MetaFile":
|
||||
"""Creates MetaFile object from bytes.
|
||||
This constructor should only be used if hashes are wanted.
|
||||
|
|
@ -843,9 +838,9 @@ def from_data(
|
|||
length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
|
||||
return cls(version, length, hashes)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dictionary representation of self."""
|
||||
res_dict: Dict[str, Any] = {
|
||||
res_dict: dict[str, Any] = {
|
||||
"version": self.version,
|
||||
**self.unrecognized_fields,
|
||||
}
|
||||
|
|
@ -907,7 +902,7 @@ def __init__(
|
|||
spec_version: Optional[str] = None,
|
||||
expires: Optional[datetime] = None,
|
||||
snapshot_meta: Optional[MetaFile] = None,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
super().__init__(version, spec_version, expires, unrecognized_fields)
|
||||
self.snapshot_meta = snapshot_meta or MetaFile(1)
|
||||
|
|
@ -921,7 +916,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp":
|
||||
def from_dict(cls, signed_dict: dict[str, Any]) -> "Timestamp":
|
||||
"""Create ``Timestamp`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -933,7 +928,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp":
|
|||
# All fields left in the timestamp_dict are unrecognized.
|
||||
return cls(*common_args, snapshot_meta, signed_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
res_dict = self._common_fields_to_dict()
|
||||
res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()}
|
||||
|
|
@ -969,8 +964,8 @@ def __init__(
|
|||
version: Optional[int] = None,
|
||||
spec_version: Optional[str] = None,
|
||||
expires: Optional[datetime] = None,
|
||||
meta: Optional[Dict[str, MetaFile]] = None,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
meta: Optional[dict[str, MetaFile]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
super().__init__(version, spec_version, expires, unrecognized_fields)
|
||||
self.meta = meta if meta is not None else {"targets.json": MetaFile(1)}
|
||||
|
|
@ -982,7 +977,7 @@ def __eq__(self, other: object) -> bool:
|
|||
return super().__eq__(other) and self.meta == other.meta
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot":
|
||||
def from_dict(cls, signed_dict: dict[str, Any]) -> "Snapshot":
|
||||
"""Create ``Snapshot`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -996,7 +991,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot":
|
|||
# All fields left in the snapshot_dict are unrecognized.
|
||||
return cls(*common_args, meta, signed_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
snapshot_dict = self._common_fields_to_dict()
|
||||
meta_dict = {}
|
||||
|
|
@ -1040,12 +1035,12 @@ class DelegatedRole(Role):
|
|||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
keyids: List[str],
|
||||
keyids: list[str],
|
||||
threshold: int,
|
||||
terminating: bool,
|
||||
paths: Optional[List[str]] = None,
|
||||
path_hash_prefixes: Optional[List[str]] = None,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
paths: Optional[list[str]] = None,
|
||||
path_hash_prefixes: Optional[list[str]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
super().__init__(keyids, threshold, unrecognized_fields)
|
||||
self.name = name
|
||||
|
|
@ -1079,7 +1074,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole":
|
||||
def from_dict(cls, role_dict: dict[str, Any]) -> "DelegatedRole":
|
||||
"""Create ``DelegatedRole`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -1102,7 +1097,7 @@ def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole":
|
|||
role_dict,
|
||||
)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
base_role_dict = super().to_dict()
|
||||
res_dict = {
|
||||
|
|
@ -1201,11 +1196,11 @@ class SuccinctRoles(Role):
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
keyids: List[str],
|
||||
keyids: list[str],
|
||||
threshold: int,
|
||||
bit_length: int,
|
||||
name_prefix: str,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
) -> None:
|
||||
super().__init__(keyids, threshold, unrecognized_fields)
|
||||
|
||||
|
|
@ -1237,7 +1232,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, role_dict: Dict[str, Any]) -> "SuccinctRoles":
|
||||
def from_dict(cls, role_dict: dict[str, Any]) -> "SuccinctRoles":
|
||||
"""Create ``SuccinctRoles`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -1250,7 +1245,7 @@ def from_dict(cls, role_dict: Dict[str, Any]) -> "SuccinctRoles":
|
|||
# All fields left in the role_dict are unrecognized.
|
||||
return cls(keyids, threshold, bit_length, name_prefix, role_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
base_role_dict = super().to_dict()
|
||||
return {
|
||||
|
|
@ -1344,10 +1339,10 @@ class Delegations:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
keys: Dict[str, Key],
|
||||
roles: Optional[Dict[str, DelegatedRole]] = None,
|
||||
keys: dict[str, Key],
|
||||
roles: Optional[dict[str, DelegatedRole]] = None,
|
||||
succinct_roles: Optional[SuccinctRoles] = None,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
self.keys = keys
|
||||
if sum(1 for v in [roles, succinct_roles] if v is not None) != 1:
|
||||
|
|
@ -1389,7 +1384,7 @@ def __eq__(self, other: object) -> bool:
|
|||
return all_attributes_check
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations":
|
||||
def from_dict(cls, delegations_dict: dict[str, Any]) -> "Delegations":
|
||||
"""Create ``Delegations`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -1400,7 +1395,7 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations":
|
|||
for keyid, key_dict in keys.items():
|
||||
keys_res[keyid] = Key.from_dict(keyid, key_dict)
|
||||
roles = delegations_dict.pop("roles", None)
|
||||
roles_res: Optional[Dict[str, DelegatedRole]] = None
|
||||
roles_res: Optional[dict[str, DelegatedRole]] = None
|
||||
|
||||
if roles is not None:
|
||||
roles_res = {}
|
||||
|
|
@ -1418,10 +1413,10 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations":
|
|||
# All fields left in the delegations_dict are unrecognized.
|
||||
return cls(keys_res, roles_res, succinct_roles_info, delegations_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
keys = {keyid: key.to_dict() for keyid, key in self.keys.items()}
|
||||
res_dict: Dict[str, Any] = {
|
||||
res_dict: dict[str, Any] = {
|
||||
"keys": keys,
|
||||
**self.unrecognized_fields,
|
||||
}
|
||||
|
|
@ -1435,7 +1430,7 @@ def to_dict(self) -> Dict[str, Any]:
|
|||
|
||||
def get_roles_for_target(
|
||||
self, target_filepath: str
|
||||
) -> Iterator[Tuple[str, bool]]:
|
||||
) -> Iterator[tuple[str, bool]]:
|
||||
"""Given ``target_filepath`` get names and terminating status of all
|
||||
delegated roles who are responsible for it.
|
||||
|
||||
|
|
@ -1475,9 +1470,9 @@ class TargetFile(BaseFile):
|
|||
def __init__(
|
||||
self,
|
||||
length: int,
|
||||
hashes: Dict[str, str],
|
||||
hashes: dict[str, str],
|
||||
path: str,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
self._validate_length(length)
|
||||
self._validate_hashes(hashes)
|
||||
|
|
@ -1510,7 +1505,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile":
|
||||
def from_dict(cls, target_dict: dict[str, Any], path: str) -> "TargetFile":
|
||||
"""Create ``TargetFile`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -1522,7 +1517,7 @@ def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile":
|
|||
# All fields left in the target_dict are unrecognized.
|
||||
return cls(length, hashes, path, target_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the JSON-serializable dictionary representation of self."""
|
||||
return {
|
||||
"length": self.length,
|
||||
|
|
@ -1535,7 +1530,7 @@ def from_file(
|
|||
cls,
|
||||
target_file_path: str,
|
||||
local_path: str,
|
||||
hash_algorithms: Optional[List[str]] = None,
|
||||
hash_algorithms: Optional[list[str]] = None,
|
||||
) -> "TargetFile":
|
||||
"""Create ``TargetFile`` object from a file.
|
||||
|
||||
|
|
@ -1559,7 +1554,7 @@ def from_data(
|
|||
cls,
|
||||
target_file_path: str,
|
||||
data: Union[bytes, IO[bytes]],
|
||||
hash_algorithms: Optional[List[str]] = None,
|
||||
hash_algorithms: Optional[list[str]] = None,
|
||||
) -> "TargetFile":
|
||||
"""Create ``TargetFile`` object from bytes.
|
||||
|
||||
|
|
@ -1590,7 +1585,7 @@ def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None:
|
|||
self._verify_length(data, self.length)
|
||||
self._verify_hashes(data, self.hashes)
|
||||
|
||||
def get_prefixed_paths(self) -> List[str]:
|
||||
def get_prefixed_paths(self) -> list[str]:
|
||||
"""
|
||||
Return hash-prefixed URL path fragments for the target file path.
|
||||
"""
|
||||
|
|
@ -1634,9 +1629,9 @@ def __init__(
|
|||
version: Optional[int] = None,
|
||||
spec_version: Optional[str] = None,
|
||||
expires: Optional[datetime] = None,
|
||||
targets: Optional[Dict[str, TargetFile]] = None,
|
||||
targets: Optional[dict[str, TargetFile]] = None,
|
||||
delegations: Optional[Delegations] = None,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
) -> None:
|
||||
super().__init__(version, spec_version, expires, unrecognized_fields)
|
||||
self.targets = targets if targets is not None else {}
|
||||
|
|
@ -1653,7 +1648,7 @@ def __eq__(self, other: object) -> bool:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets":
|
||||
def from_dict(cls, signed_dict: dict[str, Any]) -> "Targets":
|
||||
"""Create ``Targets`` object from its json/dict representation.
|
||||
|
||||
Raises:
|
||||
|
|
@ -1675,7 +1670,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets":
|
|||
# All fields left in the targets_dict are unrecognized.
|
||||
return cls(*common_args, res_targets, delegations, signed_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
targets_dict = self._common_fields_to_dict()
|
||||
targets = {}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
"""Low-level TUF DSSE API. (experimental!)"""
|
||||
|
||||
import json
|
||||
from typing import Generic, Type, cast
|
||||
from typing import Generic, cast
|
||||
|
||||
from securesystemslib.dsse import Envelope as BaseSimpleEnvelope
|
||||
|
||||
|
|
@ -135,7 +135,7 @@ def get_signed(self) -> T:
|
|||
# TODO: can we move this to tuf.api._payload?
|
||||
_type = payload_dict["_type"]
|
||||
if _type == _TARGETS:
|
||||
inner_cls: Type[Signed] = Targets
|
||||
inner_cls: type[Signed] = Targets
|
||||
elif _type == _SNAPSHOT:
|
||||
inner_cls = Snapshot
|
||||
elif _type == _TIMESTAMP:
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
import logging
|
||||
import tempfile
|
||||
from typing import Any, Dict, Generic, Optional, Type, cast
|
||||
from typing import Any, Generic, Optional, cast
|
||||
|
||||
from securesystemslib.signer import Signature, Signer
|
||||
from securesystemslib.storage import FilesystemBackend, StorageBackendInterface
|
||||
|
|
@ -121,8 +121,8 @@ class Metadata(Generic[T]):
|
|||
def __init__(
|
||||
self,
|
||||
signed: T,
|
||||
signatures: Optional[Dict[str, Signature]] = None,
|
||||
unrecognized_fields: Optional[Dict[str, Any]] = None,
|
||||
signatures: Optional[dict[str, Signature]] = None,
|
||||
unrecognized_fields: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
self.signed: T = signed
|
||||
self.signatures = signatures if signatures is not None else {}
|
||||
|
|
@ -153,7 +153,7 @@ def signed_bytes(self) -> bytes:
|
|||
return CanonicalJSONSerializer().serialize(self.signed)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
|
||||
def from_dict(cls, metadata: dict[str, Any]) -> "Metadata[T]":
|
||||
"""Create ``Metadata`` object from its json/dict representation.
|
||||
|
||||
Args:
|
||||
|
|
@ -173,7 +173,7 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
|
|||
_type = metadata["signed"]["_type"]
|
||||
|
||||
if _type == _TARGETS:
|
||||
inner_cls: Type[Signed] = Targets
|
||||
inner_cls: type[Signed] = Targets
|
||||
elif _type == _SNAPSHOT:
|
||||
inner_cls = Snapshot
|
||||
elif _type == _TIMESTAMP:
|
||||
|
|
@ -184,7 +184,7 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
|
|||
raise ValueError(f'unrecognized metadata type "{_type}"')
|
||||
|
||||
# Make sure signatures are unique
|
||||
signatures: Dict[str, Signature] = {}
|
||||
signatures: dict[str, Signature] = {}
|
||||
for sig_dict in metadata.pop("signatures"):
|
||||
sig = Signature.from_dict(sig_dict)
|
||||
if sig.keyid in signatures:
|
||||
|
|
@ -292,7 +292,7 @@ def to_bytes(
|
|||
|
||||
return serializer.serialize(self)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Return the dict representation of self."""
|
||||
|
||||
signatures = [sig.to_dict() for sig in self.signatures.values()]
|
||||
|
|
|
|||
|
|
@ -10,7 +10,8 @@
|
|||
# can be moved out of _internal once sigstore-python 1.0 is not relevant.
|
||||
|
||||
import logging
|
||||
from typing import Dict, Iterator, Optional, Tuple
|
||||
from collections.abc import Iterator
|
||||
from typing import Optional
|
||||
from urllib import parse
|
||||
|
||||
# Imports
|
||||
|
|
@ -54,7 +55,7 @@ def __init__(
|
|||
# improve efficiency, but avoiding sharing state between different
|
||||
# hosts-scheme combinations to minimize subtle security issues.
|
||||
# Some cookies may not be HTTP-safe.
|
||||
self._sessions: Dict[Tuple[str, str], requests.Session] = {}
|
||||
self._sessions: dict[tuple[str, str], requests.Session] = {}
|
||||
|
||||
# Default settings
|
||||
self.socket_timeout: int = socket_timeout # seconds
|
||||
|
|
|
|||
|
|
@ -64,7 +64,8 @@
|
|||
import datetime
|
||||
import logging
|
||||
from collections import abc
|
||||
from typing import Dict, Iterator, Optional, Tuple, Type, Union, cast
|
||||
from collections.abc import Iterator
|
||||
from typing import Optional, Union, cast
|
||||
|
||||
from securesystemslib.signer import Signature
|
||||
|
||||
|
|
@ -109,7 +110,7 @@ def __init__(self, root_data: bytes, envelope_type: EnvelopeType):
|
|||
RepositoryError: Metadata failed to load or verify. The actual
|
||||
error type and content will contain more details.
|
||||
"""
|
||||
self._trusted_set: Dict[str, Signed] = {}
|
||||
self._trusted_set: dict[str, Signed] = {}
|
||||
self.reference_time = datetime.datetime.now(datetime.timezone.utc)
|
||||
|
||||
if envelope_type is EnvelopeType.SIMPLE:
|
||||
|
|
@ -450,11 +451,11 @@ def _load_trusted_root(self, data: bytes) -> None:
|
|||
|
||||
|
||||
def _load_from_metadata(
|
||||
role: Type[T],
|
||||
role: type[T],
|
||||
data: bytes,
|
||||
delegator: Optional[Delegator] = None,
|
||||
role_name: Optional[str] = None,
|
||||
) -> Tuple[T, bytes, Dict[str, Signature]]:
|
||||
) -> tuple[T, bytes, dict[str, Signature]]:
|
||||
"""Load traditional metadata bytes, and extract and verify payload.
|
||||
|
||||
If no delegator is passed, verification is skipped. Returns a tuple of
|
||||
|
|
@ -477,11 +478,11 @@ def _load_from_metadata(
|
|||
|
||||
|
||||
def _load_from_simple_envelope(
|
||||
role: Type[T],
|
||||
role: type[T],
|
||||
data: bytes,
|
||||
delegator: Optional[Delegator] = None,
|
||||
role_name: Optional[str] = None,
|
||||
) -> Tuple[T, bytes, Dict[str, Signature]]:
|
||||
) -> tuple[T, bytes, dict[str, Signature]]:
|
||||
"""Load simple envelope bytes, and extract and verify payload.
|
||||
|
||||
If no delegator is passed, verification is skipped. Returns a tuple of
|
||||
|
|
|
|||
|
|
@ -7,8 +7,9 @@
|
|||
import abc
|
||||
import logging
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from typing import IO, Iterator
|
||||
from typing import IO
|
||||
|
||||
from tuf.api import exceptions
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from typing import Optional, Set, cast
|
||||
from typing import Optional, cast
|
||||
from urllib import parse
|
||||
|
||||
from tuf.api import exceptions
|
||||
|
|
@ -430,7 +430,7 @@ def _preorder_depth_first_walk(
|
|||
# List of delegations to be interrogated. A (role, parent role) pair
|
||||
# is needed to load and verify the delegated targets metadata.
|
||||
delegations_to_visit = [(Targets.type, Root.type)]
|
||||
visited_role_names: Set[str] = set()
|
||||
visited_role_names: set[str] = set()
|
||||
|
||||
# Preorder depth-first traversal of the graph of target delegations.
|
||||
while (
|
||||
|
|
|
|||
|
|
@ -5,9 +5,10 @@
|
|||
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Generator
|
||||
from contextlib import contextmanager, suppress
|
||||
from copy import deepcopy
|
||||
from typing import Dict, Generator, Optional, Tuple
|
||||
from typing import Optional
|
||||
|
||||
from tuf.api.exceptions import UnsignedMetadataError
|
||||
from tuf.api.metadata import (
|
||||
|
|
@ -63,7 +64,7 @@ def close(self, role: str, md: Metadata) -> None:
|
|||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def targets_infos(self) -> Dict[str, MetaFile]:
|
||||
def targets_infos(self) -> dict[str, MetaFile]:
|
||||
"""Returns the MetaFiles for current targets metadatas
|
||||
|
||||
This property is used by do_snapshot() to update Snapshot.meta:
|
||||
|
|
@ -168,7 +169,7 @@ def targets(self, rolename: str = Targets.type) -> Targets:
|
|||
|
||||
def do_snapshot(
|
||||
self, force: bool = False
|
||||
) -> Tuple[bool, Dict[str, MetaFile]]:
|
||||
) -> tuple[bool, dict[str, MetaFile]]:
|
||||
"""Update snapshot meta information
|
||||
|
||||
Updates the snapshot meta information according to current targets
|
||||
|
|
@ -187,7 +188,7 @@ def do_snapshot(
|
|||
# * any targets files are not yet in snapshot or
|
||||
# * any targets version is incorrect
|
||||
update_version = force
|
||||
removed: Dict[str, MetaFile] = {}
|
||||
removed: dict[str, MetaFile] = {}
|
||||
|
||||
root = self.root()
|
||||
snapshot_md = self.open(Snapshot.type)
|
||||
|
|
@ -230,7 +231,7 @@ def do_snapshot(
|
|||
|
||||
def do_timestamp(
|
||||
self, force: bool = False
|
||||
) -> Tuple[bool, Optional[MetaFile]]:
|
||||
) -> tuple[bool, Optional[MetaFile]]:
|
||||
"""Update timestamp meta information
|
||||
|
||||
Updates timestamp according to current snapshot state
|
||||
|
|
|
|||
Loading…
Reference in a new issue