python-tuf/tests/test_api.py
Martin Vrachev 29f936b76d Tests: address new pylint warnings
After the recent changes there are a couple of new pylint warnings that
appeared.
They are caused by the new test file that was added
test_updater_top_level_update.py and the limit of public functions was
reached in the TestMetadata class in test_api.py
The warnings should be addressed before enabling all of the linters
on the tests files.

Signed-off-by: Martin Vrachev <mvrachev@vmware.com>
2021-11-12 17:10:06 +02:00

744 lines
30 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2020, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
""" Unit tests for api/metadata.py
"""
import json
import logging
import os
import shutil
import sys
import tempfile
import unittest
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from securesystemslib import hash as sslib_hash
from securesystemslib.interface import (
import_ed25519_privatekey_from_file,
import_ed25519_publickey_from_file,
)
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import Signature, SSlibSigner
from tests import utils
from tuf import exceptions
from tuf.api.metadata import (
DelegatedRole,
Key,
Metadata,
MetaFile,
Root,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
from tuf.api.serialization import DeserializationError
from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer
logger = logging.getLogger(__name__)
# pylint: disable=too-many-public-methods
class TestMetadata(unittest.TestCase):
"""Tests for public API of all classes in 'tuf/api/metadata.py'."""
@classmethod
def setUpClass(cls):
# Create a temporary directory to store the repository, metadata, and
# target files. 'temporary_directory' must be deleted in
# TearDownClass() so that temporary files are always removed, even when
# exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
test_repo_data = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "repository_data"
)
cls.repo_dir = os.path.join(cls.temporary_directory, "repository")
shutil.copytree(
os.path.join(test_repo_data, "repository"), cls.repo_dir
)
cls.keystore_dir = os.path.join(cls.temporary_directory, "keystore")
shutil.copytree(
os.path.join(test_repo_data, "keystore"), cls.keystore_dir
)
# Load keys into memory
cls.keystore = {}
for role in ["delegation", "snapshot", "targets", "timestamp"]:
cls.keystore[role] = import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + "_key"),
password="password",
)
@classmethod
def tearDownClass(cls):
# Remove the temporary repository directory, which should contain all
# the metadata, targets, and key files generated for the test cases.
shutil.rmtree(cls.temporary_directory)
def test_generic_read(self):
for metadata, inner_metadata_cls in [
("root", Root),
("snapshot", Snapshot),
("timestamp", Timestamp),
("targets", Targets),
]:
# Load JSON-formatted metdata of each supported type from file
# and from out-of-band read JSON string
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
md_obj = Metadata.from_file(path)
with open(path, "rb") as f:
md_obj2 = Metadata.from_bytes(f.read())
# Assert that both methods instantiate the right inner class for
# each metadata type and ...
self.assertTrue(isinstance(md_obj.signed, inner_metadata_cls))
self.assertTrue(isinstance(md_obj2.signed, inner_metadata_cls))
# ... and return the same object (compared by dict representation)
self.assertDictEqual(md_obj.to_dict(), md_obj2.to_dict())
# Assert that it chokes correctly on an unknown metadata type
bad_metadata_path = "bad-metadata.json"
bad_metadata = {"signed": {"_type": "bad-metadata"}}
bad_string = json.dumps(bad_metadata).encode("utf-8")
with open(bad_metadata_path, "wb") as f:
f.write(bad_string)
with self.assertRaises(DeserializationError):
Metadata.from_file(bad_metadata_path)
with self.assertRaises(DeserializationError):
Metadata.from_bytes(bad_string)
os.remove(bad_metadata_path)
def test_compact_json(self):
path = os.path.join(self.repo_dir, "metadata", "targets.json")
md_obj = Metadata.from_file(path)
self.assertTrue(
len(JSONSerializer(compact=True).serialize(md_obj))
< len(JSONSerializer().serialize(md_obj))
)
def test_read_write_read_compare(self):
for metadata in ["root", "snapshot", "timestamp", "targets"]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
md_obj = Metadata.from_file(path)
path_2 = path + ".tmp"
md_obj.to_file(path_2)
md_obj_2 = Metadata.from_file(path_2)
self.assertDictEqual(md_obj.to_dict(), md_obj_2.to_dict())
os.remove(path_2)
def test_to_from_bytes(self):
for metadata in ["root", "snapshot", "timestamp", "targets"]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
with open(path, "rb") as f:
metadata_bytes = f.read()
md_obj = Metadata.from_bytes(metadata_bytes)
# Comparate that from_bytes/to_bytes doesn't change the content
# for two cases for the serializer: noncompact and compact.
# Case 1: test noncompact by overriding the default serializer.
self.assertEqual(md_obj.to_bytes(JSONSerializer()), metadata_bytes)
# Case 2: test compact by using the default serializer.
obj_bytes = md_obj.to_bytes()
metadata_obj_2 = Metadata.from_bytes(obj_bytes)
self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes)
def test_sign_verify(self):
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path).signed
# Locate the public keys we need from root
targets_keyid = next(iter(root.roles["targets"].keyids))
targets_key = root.keys[targets_keyid]
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
snapshot_key = root.keys[snapshot_keyid]
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_key = root.keys[timestamp_keyid]
# Load sample metadata (targets) and assert ...
path = os.path.join(self.repo_dir, "metadata", "targets.json")
md_obj = Metadata.from_file(path)
# ... it has a single existing signature,
self.assertEqual(len(md_obj.signatures), 1)
# ... which is valid for the correct key.
targets_key.verify_signature(md_obj)
with self.assertRaises(exceptions.UnsignedMetadataError):
snapshot_key.verify_signature(md_obj)
# Test verifying with explicitly set serializer
targets_key.verify_signature(md_obj, CanonicalJSONSerializer())
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj, JSONSerializer())
sslib_signer = SSlibSigner(self.keystore["snapshot"])
# Append a new signature with the unrelated key and assert that ...
sig = md_obj.sign(sslib_signer, append=True)
# ... there are now two signatures, and
self.assertEqual(len(md_obj.signatures), 2)
# ... both are valid for the corresponding keys.
targets_key.verify_signature(md_obj)
snapshot_key.verify_signature(md_obj)
# ... the returned (appended) signature is for snapshot key
self.assertEqual(sig.keyid, snapshot_keyid)
sslib_signer = SSlibSigner(self.keystore["timestamp"])
# Create and assign (don't append) a new signature and assert that ...
md_obj.sign(sslib_signer, append=False)
# ... there now is only one signature,
self.assertEqual(len(md_obj.signatures), 1)
# ... valid for that key.
timestamp_key.verify_signature(md_obj)
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj)
def test_verify_failures(self):
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path).signed
# Locate the timestamp public key we need from root
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_key = root.keys[timestamp_keyid]
# Load sample metadata (timestamp)
path = os.path.join(self.repo_dir, "metadata", "timestamp.json")
md_obj = Metadata.from_file(path)
# Test failure on unknown scheme (securesystemslib
# UnsupportedAlgorithmError)
scheme = timestamp_key.scheme
timestamp_key.scheme = "foo"
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
timestamp_key.scheme = scheme
# Test failure on broken public key data (securesystemslib
# CryptoError)
public = timestamp_key.keyval["public"]
timestamp_key.keyval["public"] = "ffff"
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
timestamp_key.keyval["public"] = public
# Test failure with invalid signature (securesystemslib
# FormatError)
sig = md_obj.signatures[timestamp_keyid]
correct_sig = sig.signature
sig.signature = "foo"
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
# Test failure with valid but incorrect signature
sig.signature = "ff" * 64
with self.assertRaises(exceptions.UnsignedMetadataError):
timestamp_key.verify_signature(md_obj)
sig.signature = correct_sig
def test_metadata_base(self):
# Use of Snapshot is arbitrary, we're just testing the base class
# features with real data
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
md = Metadata.from_file(snapshot_path)
self.assertEqual(md.signed.version, 1)
md.signed.bump_version()
self.assertEqual(md.signed.version, 2)
self.assertEqual(md.signed.expires, datetime(2030, 1, 1, 0, 0))
md.signed.bump_expiration()
self.assertEqual(md.signed.expires, datetime(2030, 1, 2, 0, 0))
md.signed.bump_expiration(timedelta(days=365))
self.assertEqual(md.signed.expires, datetime(2031, 1, 2, 0, 0))
# Test is_expired with reference_time provided
is_expired = md.signed.is_expired(md.signed.expires)
self.assertTrue(is_expired)
is_expired = md.signed.is_expired(md.signed.expires + timedelta(days=1))
self.assertTrue(is_expired)
is_expired = md.signed.is_expired(md.signed.expires - timedelta(days=1))
self.assertFalse(is_expired)
# Test is_expired without reference_time,
# manipulating md.signed.expires
expires = md.signed.expires
md.signed.expires = datetime.utcnow()
is_expired = md.signed.is_expired()
self.assertTrue(is_expired)
md.signed.expires = datetime.utcnow() + timedelta(days=1)
is_expired = md.signed.is_expired()
self.assertFalse(is_expired)
md.signed.expires = expires
# Test deserializing metadata with non-unique signatures:
data = md.to_dict()
data["signatures"].append(
{"keyid": data["signatures"][0]["keyid"], "sig": "foo"}
)
with self.assertRaises(ValueError):
Metadata.from_dict(data)
def test_metadata_snapshot(self):
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
snapshot = Metadata[Snapshot].from_file(snapshot_path)
# Create a MetaFile instance representing what we expect
# the updated data to be.
hashes = {
"sha256": "c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095"
}
fileinfo = MetaFile(2, 123, hashes)
self.assertNotEqual(
snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict()
)
snapshot.signed.update("role1", fileinfo)
self.assertEqual(
snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict()
)
def test_metadata_timestamp(self):
timestamp_path = os.path.join(
self.repo_dir, "metadata", "timestamp.json"
)
timestamp = Metadata[Timestamp].from_file(timestamp_path)
self.assertEqual(timestamp.signed.version, 1)
timestamp.signed.bump_version()
self.assertEqual(timestamp.signed.version, 2)
self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 1, 0, 0))
timestamp.signed.bump_expiration()
self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 2, 0, 0))
timestamp.signed.bump_expiration(timedelta(days=365))
self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 2, 0, 0))
# Test whether dateutil.relativedelta works, this provides a much
# easier to use interface for callers
delta = relativedelta(days=1)
timestamp.signed.bump_expiration(delta)
self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 3, 0, 0))
delta = relativedelta(years=5)
timestamp.signed.bump_expiration(delta)
self.assertEqual(timestamp.signed.expires, datetime(2036, 1, 3, 0, 0))
# Create a MetaFile instance representing what we expect
# the updated data to be.
hashes = {
"sha256": "0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc"
}
fileinfo = MetaFile(2, 520, hashes)
self.assertNotEqual(
timestamp.signed.snapshot_meta.to_dict(), fileinfo.to_dict()
)
timestamp.signed.update(fileinfo)
self.assertEqual(
timestamp.signed.snapshot_meta.to_dict(), fileinfo.to_dict()
)
def test_metadata_verify_delegate(self):
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
snapshot = Metadata[Snapshot].from_file(snapshot_path)
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)
role1_path = os.path.join(self.repo_dir, "metadata", "role1.json")
role1 = Metadata[Targets].from_file(role1_path)
role2_path = os.path.join(self.repo_dir, "metadata", "role2.json")
role2 = Metadata[Targets].from_file(role2_path)
# test the expected delegation tree
root.verify_delegate("root", root)
root.verify_delegate("snapshot", snapshot)
root.verify_delegate("targets", targets)
targets.verify_delegate("role1", role1)
role1.verify_delegate("role2", role2)
# only root and targets can verify delegates
with self.assertRaises(TypeError):
snapshot.verify_delegate("snapshot", snapshot)
# verify fails for roles that are not delegated by delegator
with self.assertRaises(ValueError):
root.verify_delegate("role1", role1)
with self.assertRaises(ValueError):
targets.verify_delegate("targets", targets)
# verify fails when delegator has no delegations
with self.assertRaises(ValueError):
role2.verify_delegate("role1", role1)
# verify fails when delegate content is modified
expires = snapshot.signed.expires
snapshot.signed.bump_expiration()
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
snapshot.signed.expires = expires
# verify fails if roles keys do not sign the metadata
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("timestamp", snapshot)
# Add a key to snapshot role, make sure the new sig fails to verify
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)
# verify succeeds if threshold is reached even if some signatures
# fail to verify
root.verify_delegate("snapshot", snapshot)
# verify fails if threshold of signatures is not reached
root.signed.roles["snapshot"].threshold = 2
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
# verify succeeds when we correct the new signature and reach the
# threshold of 2 keys
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
root.verify_delegate("snapshot", snapshot)
def test_key_class(self):
# Test if from_securesystemslib_key removes the private key from keyval
# of a securesystemslib key dictionary.
sslib_key = generate_ed25519_key()
key = Key.from_securesystemslib_key(sslib_key)
self.assertFalse("private" in key.keyval.keys())
def test_root_add_key_and_remove_key(self):
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)
# Create a new key
root_key2 = import_ed25519_publickey_from_file(
os.path.join(self.keystore_dir, "root_key2.pub")
)
keyid = root_key2["keyid"]
key_metadata = Key(
keyid,
root_key2["keytype"],
root_key2["scheme"],
root_key2["keyval"],
)
# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
self.assertNotIn(keyid, root.signed.keys)
# Add new root key
root.signed.add_key("root", key_metadata)
# Assert that key is added
self.assertIn(keyid, root.signed.roles["root"].keyids)
self.assertIn(keyid, root.signed.keys)
# Confirm that the newly added key does not break
# the object serialization
root.to_dict()
# Try adding the same key again and assert its ignored.
pre_add_keyid = root.signed.roles["root"].keyids.copy()
root.signed.add_key("root", key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids)
# Add the same key to targets role as well
root.signed.add_key("targets", key_metadata)
# Add the same key to a nonexistent role.
with self.assertRaises(ValueError):
root.signed.add_key("nosuchrole", key_metadata)
# Remove the key from root role (targets role still uses it)
root.signed.remove_key("root", keyid)
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
self.assertIn(keyid, root.signed.keys)
# Remove the key from targets as well
root.signed.remove_key("targets", keyid)
self.assertNotIn(keyid, root.signed.roles["targets"].keyids)
self.assertNotIn(keyid, root.signed.keys)
with self.assertRaises(ValueError):
root.signed.remove_key("root", "nosuchkey")
with self.assertRaises(ValueError):
root.signed.remove_key("nosuchrole", keyid)
def test_is_target_in_pathpattern(self):
# pylint: disable=protected-access
supported_use_cases = [
("foo.tgz", "foo.tgz"),
("foo.tgz", "*"),
("foo.tgz", "*.tgz"),
("foo-version-a.tgz", "foo-version-?.tgz"),
("targets/foo.tgz", "targets/*.tgz"),
("foo/bar/zoo/k.tgz", "foo/bar/zoo/*"),
("foo/bar/zoo/k.tgz", "foo/*/zoo/*"),
("foo/bar/zoo/k.tgz", "*/*/*/*"),
("foo/bar", "f?o/bar"),
("foo/bar", "*o/bar"),
]
for targetpath, pathpattern in supported_use_cases:
self.assertTrue(
DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern)
)
invalid_use_cases = [
("targets/foo.tgz", "*.tgz"),
("/foo.tgz", "*.tgz"),
("targets/foo.tgz", "*"),
("foo-version-alpha.tgz", "foo-version-?.tgz"),
("foo//bar", "*/bar"),
("foo/bar", "f?/bar"),
]
for targetpath, pathpattern in invalid_use_cases:
self.assertFalse(
DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern)
)
def test_metadata_targets(self):
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)
# Create a fileinfo dict representing the expected updated data.
filename = "file2.txt"
hashes = {
"sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b",
"sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0",
}
fileinfo = TargetFile(length=28, hashes=hashes, path=filename)
# Assert that data is not aleady equal
self.assertNotEqual(
targets.signed.targets[filename].to_dict(), fileinfo.to_dict()
)
# Update an already existing fileinfo
targets.signed.update(fileinfo)
# Verify that data is updated
self.assertEqual(
targets.signed.targets[filename].to_dict(), fileinfo.to_dict()
)
def test_targets_key_api(self):
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets: Targets = Metadata[Targets].from_file(targets_path).signed
# Add a new delegated role "role2" in targets
delegated_role = DelegatedRole.from_dict(
{
"keyids": [],
"name": "role2",
"paths": ["fn3", "fn4"],
"terminating": False,
"threshold": 1,
}
)
targets.delegations.roles["role2"] = delegated_role
key_dict = {
"keytype": "ed25519",
"keyval": {
"public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd"
},
"scheme": "ed25519",
}
key = Key.from_dict("id2", key_dict)
# Assert that delegated role "role1" does not contain the new key
self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids)
targets.add_key("role1", key)
# Assert that the new key is added to the delegated role "role1"
self.assertIn(key.keyid, targets.delegations.roles["role1"].keyids)
# Confirm that the newly added key does not break the obj serialization
targets.to_dict()
# Try adding the same key again and assert its ignored.
past_keyid = targets.delegations.roles["role1"].keyids.copy()
targets.add_key("role1", key)
self.assertEqual(past_keyid, targets.delegations.roles["role1"].keyids)
# Try adding a key to a delegated role that doesn't exists
with self.assertRaises(ValueError):
targets.add_key("nosuchrole", key)
# Add the same key to "role2" as well
targets.add_key("role2", key)
# Remove the key from "role1" role ("role2" still uses it)
targets.remove_key("role1", key.keyid)
# Assert that delegated role "role1" doesn't contain the key.
self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids)
self.assertIn(key.keyid, targets.delegations.roles["role2"].keyids)
# Remove the key from "role2" as well
targets.remove_key("role2", key.keyid)
self.assertNotIn(key.keyid, targets.delegations.roles["role2"].keyids)
# Try remove key not used by "role1"
with self.assertRaises(ValueError):
targets.remove_key("role1", key.keyid)
# Try removing a key from delegated role that doesn't exists
with self.assertRaises(ValueError):
targets.remove_key("nosuchrole", key.keyid)
# Remove delegations as a whole
targets.delegations = None
# Test that calling add_key and remove_key throws an error
# and that delegations is still None after each of the api calls
with self.assertRaises(ValueError):
targets.add_key("role1", key)
self.assertTrue(targets.delegations is None)
with self.assertRaises(ValueError):
targets.remove_key("role1", key.keyid)
self.assertTrue(targets.delegations is None)
def test_length_and_hash_validation(self):
# Test metadata files' hash and length verification.
# Use timestamp to get a MetaFile object and snapshot
# for untrusted metadata file to verify.
timestamp_path = os.path.join(
self.repo_dir, "metadata", "timestamp.json"
)
timestamp = Metadata[Timestamp].from_file(timestamp_path)
snapshot_metafile = timestamp.signed.snapshot_meta
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
with open(snapshot_path, "rb") as file:
# test with data as a file object
snapshot_metafile.verify_length_and_hashes(file)
file.seek(0)
data = file.read()
# test with data as bytes
snapshot_metafile.verify_length_and_hashes(data)
# test exceptions
expected_length = snapshot_metafile.length
snapshot_metafile.length = 2345
with self.assertRaises(exceptions.LengthOrHashMismatchError):
snapshot_metafile.verify_length_and_hashes(data)
snapshot_metafile.length = expected_length
snapshot_metafile.hashes = {"sha256": "incorrecthash"}
with self.assertRaises(exceptions.LengthOrHashMismatchError):
snapshot_metafile.verify_length_and_hashes(data)
snapshot_metafile.hashes = {
"unsupported-alg": "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"
}
with self.assertRaises(exceptions.LengthOrHashMismatchError):
snapshot_metafile.verify_length_and_hashes(data)
# Test wrong algorithm format (sslib.FormatError)
snapshot_metafile.hashes = {
256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"
}
with self.assertRaises(exceptions.LengthOrHashMismatchError):
snapshot_metafile.verify_length_and_hashes(data)
# test optional length and hashes
snapshot_metafile.length = None
snapshot_metafile.hashes = None
snapshot_metafile.verify_length_and_hashes(data)
# Test target files' hash and length verification
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)
file1_targetfile = targets.signed.targets["file1.txt"]
filepath = os.path.join(self.repo_dir, "targets", "file1.txt")
with open(filepath, "rb") as file1:
file1_targetfile.verify_length_and_hashes(file1)
# test exceptions
expected_length = file1_targetfile.length
file1_targetfile.length = 2345
with self.assertRaises(exceptions.LengthOrHashMismatchError):
file1_targetfile.verify_length_and_hashes(file1)
file1_targetfile.length = expected_length
file1_targetfile.hashes = {"sha256": "incorrecthash"}
with self.assertRaises(exceptions.LengthOrHashMismatchError):
file1_targetfile.verify_length_and_hashes(file1)
def test_targetfile_from_file(self):
# Test with an existing file and valid hash algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
targetfile_from_file = TargetFile.from_file(
file_path, file_path, ["sha256"]
)
with open(file_path, "rb") as file:
targetfile_from_file.verify_length_and_hashes(file)
# Test with a non-existing file
file_path = os.path.join(self.repo_dir, "targets", "file123.txt")
with self.assertRaises(FileNotFoundError):
TargetFile.from_file(
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
)
# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
with self.assertRaises(exceptions.UnsupportedAlgorithmError):
TargetFile.from_file(file_path, file_path, ["123"])
def test_targetfile_from_data(self):
data = b"Inline test content"
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
# Test with a valid hash algorithm
targetfile_from_data = TargetFile.from_data(
target_file_path, data, ["sha256"]
)
targetfile_from_data.verify_length_and_hashes(data)
# Test with no algorithms specified
targetfile_from_data = TargetFile.from_data(target_file_path, data)
targetfile_from_data.verify_length_and_hashes(data)
def test_is_delegated_role(self):
# test path matches
# see more extensive tests in test_is_target_in_pathpattern()
for paths in [
["a/path"],
["otherpath", "a/path"],
["*/?ath"],
]:
role = DelegatedRole("", [], 1, False, paths, None)
self.assertFalse(role.is_delegated_path("a/non-matching path"))
self.assertTrue(role.is_delegated_path("a/path"))
# test path hash prefix matches: sha256 sum of "a/path" is 927b0ecf9...
for hash_prefixes in [
["927b0ecf9"],
["other prefix", "927b0ecf9"],
["927b0"],
["92"],
]:
role = DelegatedRole("", [], 1, False, None, hash_prefixes)
self.assertFalse(role.is_delegated_path("a/non-matching path"))
self.assertTrue(role.is_delegated_path("a/path"))
# Run unit test.
if __name__ == "__main__":
utils.configure_test_logging(sys.argv)
unittest.main()