Merge pull request #2165 from jku/no-key

Move (most of) Key to Securesystemslib
This commit is contained in:
Lukas Pühringer 2023-01-31 09:20:44 +01:00 committed by GitHub
commit d2c12f2d14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 148 additions and 284 deletions

View file

@ -61,8 +61,6 @@
# -- Autodoc configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html
autodoc_mock_imports = ["securesystemslib"]
# Tone down the "tuf.api.metadata." repetition
add_module_names = False
python_use_unqualified_type_names = True

View file

@ -27,13 +27,12 @@
from typing import Any, Dict
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibSigner
from securesystemslib.signer import SSlibKey, SSlibSigner
from tuf.api.metadata import (
SPECIFICATION_VERSION,
DelegatedRole,
Delegations,
Key,
Metadata,
MetaFile,
Root,
@ -157,7 +156,7 @@ def _in(days: float) -> datetime:
for name in ["targets", "snapshot", "timestamp", "root"]:
keys[name] = generate_ed25519_key()
roles["root"].signed.add_key(
Key.from_securesystemslib_key(keys[name]), name
SSlibKey.from_securesystemslib_key(keys[name]), name
)
# NOTE: We only need the public part to populate root, so it is possible to use
@ -173,7 +172,7 @@ def _in(days: float) -> datetime:
# required signature threshold.
another_root_key = generate_ed25519_key()
roles["root"].signed.add_key(
Key.from_securesystemslib_key(another_root_key), "root"
SSlibKey.from_securesystemslib_key(another_root_key), "root"
)
roles["root"].signed.roles["root"].threshold = 2
@ -271,7 +270,7 @@ def _in(days: float) -> datetime:
# https://theupdateframework.github.io/specification/latest/#delegations
roles["targets"].signed.delegations = Delegations(
keys={
keys[delegatee_name]["keyid"]: Key.from_securesystemslib_key(
keys[delegatee_name]["keyid"]: SSlibKey.from_securesystemslib_key(
keys[delegatee_name]
)
},
@ -345,7 +344,7 @@ def _in(days: float) -> datetime:
roles["root"].signed.revoke_key(keys["root"]["keyid"], "root")
roles["root"].signed.add_key(
Key.from_securesystemslib_key(new_root_key), "root"
SSlibKey.from_securesystemslib_key(new_root_key), "root"
)
roles["root"].signed.version += 1

View file

@ -23,12 +23,11 @@
from typing import Any, Dict, Iterator, List, Tuple
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibSigner
from securesystemslib.signer import SSlibKey, SSlibSigner
from tuf.api.metadata import (
DelegatedRole,
Delegations,
Key,
Metadata,
TargetFile,
Targets,
@ -146,7 +145,7 @@ def find_hash_bin(path: str) -> str:
# Create preliminary delegating targets role (bins) and add public key for
# delegated targets (bin_n) to key store. Delegation details are update below.
roles["bins"] = Metadata(Targets(expires=_in(365)))
bin_n_key = Key.from_securesystemslib_key(keys["bin-n"])
bin_n_key = SSlibKey.from_securesystemslib_key(keys["bin-n"])
roles["bins"].signed.delegations = Delegations(
keys={bin_n_key.keyid: bin_n_key},
roles={},

View file

@ -25,7 +25,7 @@
from typing import Dict, Tuple
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibSigner
from securesystemslib.signer import SSlibKey, SSlibSigner
from tuf.api.metadata import (
Delegations,
@ -82,7 +82,7 @@
def create_key() -> Tuple[Key, SSlibSigner]:
"""Generates a new Key and Signer."""
sslib_key = generate_ed25519_key()
return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key)
return SSlibKey.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key)
# Create one signing key for all bins, and one for the delegating targets role.

View file

@ -10,10 +10,9 @@
from typing import Dict, List
from securesystemslib import keys
from securesystemslib.signer import Signer, SSlibSigner
from securesystemslib.signer import Signer, SSlibKey, SSlibSigner
from tuf.api.metadata import (
Key,
Metadata,
MetaFile,
Root,
@ -72,7 +71,7 @@ def __init__(self) -> None:
for role in ["root", "timestamp", "snapshot", "targets"]:
key = keys.generate_ed25519_key()
self.signer_cache[role].append(SSlibSigner(key))
root.add_key(Key.from_securesystemslib_key(key), role)
root.add_key(SSlibKey.from_securesystemslib_key(key), role)
for role in ["timestamp", "snapshot", "targets"]:
with self.edit(role):

View file

@ -44,7 +44,7 @@ classifiers = [
]
dependencies = [
"requests>=2.19.1",
"securesystemslib>=0.22.0",
"securesystemslib>=0.26.0",
]
dynamic = ["version"]

View file

@ -8,7 +8,7 @@
from datetime import datetime
from typing import Dict, List, Optional
from securesystemslib.signer import SSlibSigner
from securesystemslib.signer import SSlibKey, SSlibSigner
from tests import utils
from tuf.api.metadata import Key, Metadata, Root, Snapshot, Targets, Timestamp
@ -36,7 +36,7 @@
keys: Dict[str, Key] = {}
for index in range(4):
keys[f"ed25519_{index}"] = Key.from_securesystemslib_key(
keys[f"ed25519_{index}"] = SSlibKey.from_securesystemslib_key(
{
"keytype": "ed25519",
"scheme": "ed25519",

View file

@ -54,7 +54,7 @@
import securesystemslib.hash as sslib_hash
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibSigner
from securesystemslib.signer import SSlibKey, SSlibSigner
from tuf.api.exceptions import DownloadHTTPError
from tuf.api.metadata import (
@ -156,8 +156,8 @@ def all_targets(self) -> Iterator[Tuple[str, Targets]]:
@staticmethod
def create_key() -> Tuple[Key, SSlibSigner]:
sslib_key = generate_ed25519_key()
return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key)
key = generate_ed25519_key()
return SSlibKey.from_securesystemslib_key(key), SSlibSigner(key)
def add_signer(self, role: str, signer: SSlibSigner) -> None:
if role not in self.signers:

View file

@ -17,13 +17,14 @@
from datetime import datetime, timedelta
from typing import Any, ClassVar, Dict
from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib.interface import (
import_ed25519_privatekey_from_file,
import_ed25519_publickey_from_file,
)
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import Signature, SSlibSigner
from securesystemslib.signer import SSlibKey, SSlibSigner
from tests import utils
from tuf.api import exceptions
@ -34,6 +35,7 @@
Key,
Metadata,
Root,
Signature,
Snapshot,
SuccinctRoles,
TargetFile,
@ -187,8 +189,8 @@ def test_to_from_bytes(self) -> None:
self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes)
def test_sign_verify(self) -> None:
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path).signed
path = os.path.join(self.repo_dir, "metadata")
root = Metadata[Root].from_file(os.path.join(path, "root.json")).signed
# Locate the public keys we need from root
targets_keyid = next(iter(root.roles[Targets.type].keyids))
@ -199,41 +201,37 @@ def test_sign_verify(self) -> None:
timestamp_key = root.keys[timestamp_keyid]
# Load sample metadata (targets) and assert ...
path = os.path.join(self.repo_dir, "metadata", "targets.json")
md_obj = Metadata.from_file(path)
md_obj = Metadata.from_file(os.path.join(path, "targets.json"))
sig = md_obj.signatures[targets_keyid]
data = CanonicalJSONSerializer().serialize(md_obj.signed)
# ... it has a single existing signature,
self.assertEqual(len(md_obj.signatures), 1)
# ... which is valid for the correct key.
targets_key.verify_signature(md_obj)
with self.assertRaises(exceptions.UnsignedMetadataError):
snapshot_key.verify_signature(md_obj)
# Test verifying with explicitly set serializer
targets_key.verify_signature(md_obj, CanonicalJSONSerializer())
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type]
targets_key.verify_signature(sig, data)
with self.assertRaises(sslib_exceptions.VerificationError):
snapshot_key.verify_signature(sig, data)
sslib_signer = SSlibSigner(self.keystore[Snapshot.type])
# Append a new signature with the unrelated key and assert that ...
sig = md_obj.sign(sslib_signer, append=True)
snapshot_sig = md_obj.sign(sslib_signer, append=True)
# ... there are now two signatures, and
self.assertEqual(len(md_obj.signatures), 2)
# ... both are valid for the corresponding keys.
targets_key.verify_signature(md_obj)
snapshot_key.verify_signature(md_obj)
targets_key.verify_signature(sig, data)
snapshot_key.verify_signature(snapshot_sig, data)
# ... the returned (appended) signature is for snapshot key
self.assertEqual(sig.keyid, snapshot_keyid)
self.assertEqual(snapshot_sig.keyid, snapshot_keyid)
sslib_signer = SSlibSigner(self.keystore[Timestamp.type])
# Create and assign (don't append) a new signature and assert that ...
md_obj.sign(sslib_signer, append=False)
ts_sig = md_obj.sign(sslib_signer, append=False)
# ... there now is only one signature,
self.assertEqual(len(md_obj.signatures), 1)
# ... valid for that key.
timestamp_key.verify_signature(md_obj)
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj)
timestamp_key.verify_signature(ts_sig, data)
with self.assertRaises(sslib_exceptions.VerificationError):
targets_key.verify_signature(ts_sig, data)
def test_sign_failures(self) -> None:
# Test throwing UnsignedMetadataError because of signing problems
@ -248,7 +246,7 @@ def test_sign_failures(self) -> None:
with self.assertRaises(exceptions.UnsignedMetadataError):
md.sign(sslib_signer)
def test_verify_failures(self) -> None:
def test_key_verify_failures(self) -> None:
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path).signed
@ -259,36 +257,36 @@ def test_verify_failures(self) -> None:
# Load sample metadata (timestamp)
path = os.path.join(self.repo_dir, "metadata", "timestamp.json")
md_obj = Metadata.from_file(path)
sig = md_obj.signatures[timestamp_keyid]
data = CanonicalJSONSerializer().serialize(md_obj.signed)
# Test failure on unknown scheme (securesystemslib
# UnsupportedAlgorithmError)
scheme = timestamp_key.scheme
timestamp_key.scheme = "foo"
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
with self.assertRaises(sslib_exceptions.VerificationError):
timestamp_key.verify_signature(sig, data)
timestamp_key.scheme = scheme
# Test failure on broken public key data (securesystemslib
# CryptoError)
public = timestamp_key.keyval["public"]
timestamp_key.keyval["public"] = "ffff"
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
with self.assertRaises(sslib_exceptions.VerificationError):
timestamp_key.verify_signature(sig, data)
timestamp_key.keyval["public"] = public
# Test failure with invalid signature (securesystemslib
# FormatError)
sig = md_obj.signatures[timestamp_keyid]
correct_sig = sig.signature
sig.signature = "foo"
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
incorrect_sig = copy(sig)
incorrect_sig.signature = "foo"
with self.assertRaises(sslib_exceptions.VerificationError):
timestamp_key.verify_signature(incorrect_sig, data)
# Test failure with valid but incorrect signature
sig.signature = "ff" * 64
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
sig.signature = correct_sig
incorrect_sig.signature = "ff" * 64
with self.assertRaises(sslib_exceptions.UnverifiedSignatureError):
timestamp_key.verify_signature(incorrect_sig, data)
def test_metadata_signed_is_expired(self) -> None:
# Use of Snapshot is arbitrary, we're just testing the base class
@ -355,6 +353,15 @@ def test_metadata_verify_delegate(self) -> None:
root.verify_delegate(Snapshot.type, snapshot)
snapshot.signed.expires = expires
# verify fails if sslib verify fails with VerificationError
# (in this case signature is malformed)
keyid = next(iter(root.signed.roles[Snapshot.type].keyids))
good_sig = snapshot.signatures[keyid].signature
snapshot.signatures[keyid].signature = "foo"
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate(Snapshot.type, snapshot)
snapshot.signatures[keyid].signature = good_sig
# verify fails if roles keys do not sign the metadata
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate(Timestamp.type, snapshot)
@ -382,14 +389,9 @@ def test_key_class(self) -> None:
# Test if from_securesystemslib_key removes the private key from keyval
# of a securesystemslib key dictionary.
sslib_key = generate_ed25519_key()
key = Key.from_securesystemslib_key(sslib_key)
key = SSlibKey.from_securesystemslib_key(sslib_key)
self.assertFalse("private" in key.keyval.keys())
# Test raising ValueError with non-existent keytype
sslib_key["keytype"] = "bad keytype"
with self.assertRaises(ValueError):
Key.from_securesystemslib_key(sslib_key)
def test_root_add_key_and_revoke_key(self) -> None:
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)
@ -399,7 +401,7 @@ def test_root_add_key_and_revoke_key(self) -> None:
os.path.join(self.keystore_dir, "root_key2.pub")
)
keyid = root_key2["keyid"]
key_metadata = Key(
key_metadata = SSlibKey(
keyid,
root_key2["keytype"],
root_key2["scheme"],
@ -412,7 +414,7 @@ def test_root_add_key_and_revoke_key(self) -> None:
# Assert that add_key with old argument order will raise an error
with self.assertRaises(ValueError):
root.signed.add_key(Root.type, key_metadata) # type: ignore
root.signed.add_key(Root.type, key_metadata)
# Add new root key
root.signed.add_key(key_metadata, Root.type)
@ -513,7 +515,7 @@ def test_targets_key_api(self) -> None:
# Assert that add_key with old argument order will raise an error
with self.assertRaises(ValueError):
targets.add_key("role1", key) # type: ignore
targets.add_key("role1", key)
# Assert that delegated role "role1" does not contain the new key
self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids)

View file

@ -12,17 +12,17 @@
import unittest
from typing import Any, ClassVar, Dict
from securesystemslib.signer import Signature
from securesystemslib.signer import SSlibKey
from tests import utils
from tuf.api.metadata import (
TOP_LEVEL_ROLE_NAMES,
DelegatedRole,
Delegations,
Key,
Metadata,
MetaFile,
Role,
Signature,
SuccinctRoles,
TargetFile,
)
@ -50,7 +50,7 @@ def setUpClass(cls) -> None:
cls.objects["Metadata"] = Metadata(cls.objects["Timestamp"], {})
cls.objects["Signed"] = cls.objects["Timestamp"]
cls.objects["Key"] = Key(
cls.objects["Key"] = SSlibKey(
"id", "rsa", "rsassa-pss-sha256", {"public": "foo"}
)
cls.objects["Role"] = Role(["keyid1", "keyid2"], 3)

View file

@ -168,7 +168,7 @@ def test_valid_key_serialization(self, test_case_data: str) -> None:
@utils.run_sub_tests_with_dataset(invalid_keys)
def test_invalid_key_serialization(self, test_case_data: str) -> None:
case_dict = json.loads(test_case_data)
with self.assertRaises((TypeError, KeyError)):
with self.assertRaises((TypeError, KeyError, ValueError)):
keyid = case_dict.pop("keyid")
Key.from_dict(keyid, case_dict)

View file

@ -34,7 +34,7 @@ allowlist_externals = python3
# Must to be invoked explicitly with, e.g. `tox -e with-sslib-master`
[testenv:with-sslib-master]
commands_pre =
python3 -m pip install git+https://github.com/secure-systems-lab/securesystemslib.git@master#egg=securesystemslib[crypto,pynacl]
python3 -m pip install --force-reinstall git+https://github.com/secure-systems-lab/securesystemslib.git@master#egg=securesystemslib[crypto,pynacl]
commands =
python3 -m coverage run aggregate_tests.py

View file

@ -53,16 +53,14 @@
from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib import keys as sslib_keys
from securesystemslib.signer import Signature, Signer
from securesystemslib.signer import Key, Signature, Signer
from securesystemslib.storage import FilesystemBackend, StorageBackendInterface
from securesystemslib.util import persist_temp_file
from tuf.api import exceptions
from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
from tuf.api.serialization import (
MetadataDeserializer,
MetadataSerializer,
SerializationError,
SignedSerializer,
)
@ -220,7 +218,7 @@ def from_file(
``securesystemslib.storage.StorageBackendInterface``.
Default is ``FilesystemBackend`` (i.e. a local file).
Raises:
exceptions.StorageError: The file cannot be read.
StorageError: The file cannot be read.
tuf.api.serialization.DeserializationError:
The file cannot be deserialized.
@ -330,7 +328,7 @@ def to_file(
Raises:
tuf.api.serialization.SerializationError:
The metadata object cannot be serialized.
exceptions.StorageError: The file cannot be written.
StorageError: The file cannot be written.
"""
bytes_data = self.to_bytes(serializer)
@ -361,7 +359,7 @@ def sign(
Raises:
tuf.api.serialization.SerializationError:
``signed`` cannot be serialized.
exceptions.UnsignedMetadataError: Signing errors.
UnsignedMetadataError: Signing errors.
Returns:
``securesystemslib.signer.Signature`` object that was added into
@ -380,9 +378,7 @@ def sign(
try:
signature = signer.sign(bytes_data)
except Exception as e:
raise exceptions.UnsignedMetadataError(
"Problem signing the metadata"
) from e
raise UnsignedMetadataError("Problem signing the metadata") from e
if not append:
self.signatures.clear()
@ -413,43 +409,40 @@ def verify_delegate(
TypeError: called this function on non-delegating metadata class.
"""
# Find the keys and role in delegator metadata
role: Optional[Role] = None
if isinstance(self.signed, Root):
keys = self.signed.keys
role = self.signed.roles.get(delegated_role)
elif isinstance(self.signed, Targets):
if self.signed.delegations is None:
raise ValueError(f"No delegation found for {delegated_role}")
keys = self.signed.delegations.keys
if self.signed.delegations.roles is not None:
role = self.signed.delegations.roles.get(delegated_role)
elif self.signed.delegations.succinct_roles is not None:
if self.signed.delegations.succinct_roles.is_delegated_role(
delegated_role
):
role = self.signed.delegations.succinct_roles
else:
if self.signed.type not in ["root", "targets"]:
raise TypeError("Call is valid only on delegator metadata")
if role is None:
raise ValueError(f"No delegation found for {delegated_role}")
if signed_serializer is None:
# pylint: disable=import-outside-toplevel
from tuf.api.serialization.json import CanonicalJSONSerializer
signed_serializer = CanonicalJSONSerializer()
data = signed_serializer.serialize(delegated_metadata.signed)
role = self.signed.get_delegated_role(delegated_role)
# verify that delegated_metadata is signed by threshold of unique keys
signing_keys = set()
for keyid in role.keyids:
key = keys[keyid]
try:
key.verify_signature(delegated_metadata, signed_serializer)
signing_keys.add(key.keyid)
except exceptions.UnsignedMetadataError:
logger.debug(
"Key %s failed to verify %s", keyid, delegated_role
)
key = self.signed.get_key(keyid)
except ValueError:
logger.info("No key for keyid %s", keyid)
continue
if keyid not in delegated_metadata.signatures:
logger.info("No signature for keyid %s", keyid)
continue
sig = delegated_metadata.signatures[keyid]
try:
key.verify_signature(sig, data)
signing_keys.add(keyid)
except sslib_exceptions.UnverifiedSignatureError:
logger.info("Key %s failed to verify %s", keyid, delegated_role)
if len(signing_keys) < role.threshold:
raise exceptions.UnsignedMetadataError(
raise UnsignedMetadataError(
f"{delegated_role} was signed by {len(signing_keys)}/"
f"{role.threshold} keys",
)
@ -615,178 +608,6 @@ def is_expired(self, reference_time: Optional[datetime] = None) -> bool:
return reference_time >= self.expires
class Key:
"""A container class representing the public portion of a Key.
Supported key content (type, scheme and keyval) is defined in
`` Securesystemslib``.
*All parameters named below are not just constructor arguments but also
instance attributes.*
Args:
keyid: Key identifier that is unique within the metadata it is used in.
Keyid is not verified to be the hash of a specific representation
of the key.
keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256".
scheme: Signature scheme. For example:
"rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256".
keyval: Opaque key content
unrecognized_fields: Dictionary of all attributes that are not managed
by TUF Metadata API
Raises:
TypeError: Invalid type for an argument.
"""
def __init__(
self,
keyid: str,
keytype: str,
scheme: str,
keyval: Dict[str, str],
unrecognized_fields: Optional[Dict[str, Any]] = None,
):
if not all(
isinstance(at, str) for at in [keyid, keytype, scheme]
) or not isinstance(keyval, dict):
raise TypeError("Unexpected Key attributes types!")
self.keyid = keyid
self.keytype = keytype
self.scheme = scheme
self.keyval = keyval
if unrecognized_fields is None:
unrecognized_fields = {}
self.unrecognized_fields = unrecognized_fields
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Key):
return False
return (
self.keyid == other.keyid
and self.keytype == other.keytype
and self.scheme == other.scheme
and self.keyval == other.keyval
and self.unrecognized_fields == other.unrecognized_fields
)
@classmethod
def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key":
"""Create ``Key`` object from its json/dict representation.
Raises:
KeyError, TypeError: Invalid arguments.
"""
keytype = key_dict.pop("keytype")
scheme = key_dict.pop("scheme")
keyval = key_dict.pop("keyval")
# All fields left in the key_dict are unrecognized.
return cls(keyid, keytype, scheme, keyval, key_dict)
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of self."""
return {
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
**self.unrecognized_fields,
}
def to_securesystemslib_key(self) -> Dict[str, Any]:
"""Return a ``Securesystemslib`` compatible representation of self."""
return {
"keyid": self.keyid,
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
}
@classmethod
def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "Key":
"""Create a ``Key`` object from a securesystemlib key json/dict representation
removing the private key from keyval.
Args:
key_dict: Key in securesystemlib dict representation.
Raises:
ValueError: ``key_dict`` value is not following the securesystemslib
format.
"""
try:
key_meta = sslib_keys.format_keyval_to_metadata(
key_dict["keytype"],
key_dict["scheme"],
key_dict["keyval"],
)
except sslib_exceptions.FormatError as e:
raise ValueError(
"key_dict value is not following the securesystemslib format"
) from e
return cls(
key_dict["keyid"],
key_meta["keytype"],
key_meta["scheme"],
key_meta["keyval"],
)
def verify_signature(
self,
metadata: Metadata,
signed_serializer: Optional[SignedSerializer] = None,
) -> None:
"""Verify that the ``metadata.signatures`` contains a signature made
with this key, correctly signing ``metadata.signed``.
Args:
metadata: Metadata to verify
signed_serializer: ``SignedSerializer`` to serialize
``metadata.signed`` with. Default is ``CanonicalJSONSerializer``.
Raises:
UnsignedMetadataError: The signature could not be verified for a
variety of possible reasons: see error message.
"""
try:
signature = metadata.signatures[self.keyid]
except KeyError:
raise exceptions.UnsignedMetadataError(
f"No signature for key {self.keyid} found in metadata"
) from None
if signed_serializer is None:
# pylint: disable=import-outside-toplevel
from tuf.api.serialization.json import CanonicalJSONSerializer
signed_serializer = CanonicalJSONSerializer()
try:
if not sslib_keys.verify_signature(
self.to_securesystemslib_key(),
signature.to_dict(),
signed_serializer.serialize(metadata.signed),
):
raise exceptions.UnsignedMetadataError(
f"Failed to verify {self.keyid} signature"
)
except (
sslib_exceptions.CryptoError,
sslib_exceptions.FormatError,
sslib_exceptions.UnsupportedAlgorithmError,
SerializationError,
) as e:
# Log unexpected failure, but continue as if there was no signature
logger.warning(
"Key %s failed to verify sig: %s", self.keyid, str(e)
)
raise exceptions.UnsignedMetadataError(
f"Failed to verify {self.keyid} signature"
) from e
class Role:
"""Container that defines which keys are required to sign roles metadata.
@ -993,6 +814,24 @@ def revoke_key(self, keyid: str, role: str) -> None:
del self.keys[keyid]
def get_delegated_role(self, delegated_role: str) -> Role:
"""Return the role object for the given delegated role.
Raises ValueError if delegated_role is not actually delegated."""
if delegated_role not in self.roles:
raise ValueError(f"Delegated role {delegated_role} not found")
return self.roles[delegated_role]
def get_key(self, keyid: str) -> Key:
"""Return the key object for the given keyid.
Raises ValueError if key is not found."""
if keyid not in self.keys:
raise ValueError(f"Key {keyid} not found")
return self.keys[keyid]
class BaseFile:
"""A base class of ``MetaFile`` and ``TargetFile``.
@ -1018,13 +857,13 @@ def _verify_hashes(
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
raise exceptions.LengthOrHashMismatchError(
raise LengthOrHashMismatchError(
f"Unsupported algorithm '{algo}'"
) from e
observed_hash = digest_object.hexdigest()
if observed_hash != exp_hash:
raise exceptions.LengthOrHashMismatchError(
raise LengthOrHashMismatchError(
f"Observed hash {observed_hash} does not match "
f"expected hash {exp_hash}"
)
@ -1042,7 +881,7 @@ def _verify_length(
observed_length = data.tell()
if observed_length != expected_length:
raise exceptions.LengthOrHashMismatchError(
raise LengthOrHashMismatchError(
f"Observed length {observed_length} does not match "
f"expected length {expected_length}"
)
@ -2052,3 +1891,31 @@ def revoke_key(self, keyid: str, role: Optional[str] = None) -> None:
self.delegations.succinct_roles.keyids.remove(keyid)
del self.delegations.keys[keyid]
def get_delegated_role(self, delegated_role: str) -> Role:
"""Return the role object for the given delegated role.
Raises ValueError if delegated_role is not actually delegated."""
if self.delegations is None:
raise ValueError("No delegations found")
if self.delegations.roles is not None:
role: Optional[Role] = self.delegations.roles.get(delegated_role)
else:
role = self.delegations.succinct_roles
if not role:
raise ValueError(f"Delegated role {delegated_role} not found")
return role
def get_key(self, keyid: str) -> Key:
"""Return the key object for the given keyid.
Raises ValueError if keyid is not found."""
if self.delegations is None:
raise ValueError("No delegations found")
if keyid not in self.delegations.keys:
raise ValueError(f"Key {keyid} not found")
return self.delegations.keys[keyid]