mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
ngclient: Add initial testing
This testing lacks coverage but demonstrates the happy cases. Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
This commit is contained in:
parent
765c6fe020
commit
fac151da3d
2 changed files with 295 additions and 0 deletions
124
tests/test_metadata_bundle.py
Normal file
124
tests/test_metadata_bundle.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from tuf import exceptions
|
||||
from tuf.api.metadata import Metadata
|
||||
from tuf.ngclient._internal.metadata_bundle import MetadataBundle
|
||||
|
||||
from tests import utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TestMetadataBundle(unittest.TestCase):
|
||||
|
||||
def test_update(self):
|
||||
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
|
||||
|
||||
with open(os.path.join(repo_dir, "root.json"), "rb") as f:
|
||||
bundle = MetadataBundle(f.read())
|
||||
bundle.root_update_finished()
|
||||
|
||||
with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f:
|
||||
bundle.update_timestamp(f.read())
|
||||
with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f:
|
||||
bundle.update_snapshot(f.read())
|
||||
with open(os.path.join(repo_dir, "targets.json"), "rb") as f:
|
||||
bundle.update_targets(f.read())
|
||||
with open(os.path.join(repo_dir, "role1.json"), "rb") as f:
|
||||
bundle.update_delegated_targets(f.read(), "role1", "targets")
|
||||
with open(os.path.join(repo_dir, "role2.json"), "rb") as f:
|
||||
bundle.update_delegated_targets(f.read(), "role2", "role1")
|
||||
|
||||
def test_out_of_order_ops(self):
|
||||
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
|
||||
data={}
|
||||
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
|
||||
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
|
||||
data[md] = f.read()
|
||||
|
||||
bundle = MetadataBundle(data["root"])
|
||||
|
||||
# Update timestamp before root is finished
|
||||
with self.assertRaises(RuntimeError):
|
||||
bundle.update_timestamp(data["timestamp"])
|
||||
|
||||
bundle.root_update_finished()
|
||||
with self.assertRaises(RuntimeError):
|
||||
bundle.root_update_finished()
|
||||
|
||||
# Update snapshot before timestamp
|
||||
with self.assertRaises(RuntimeError):
|
||||
bundle.update_snapshot(data["snapshot"])
|
||||
|
||||
bundle.update_timestamp(data["timestamp"])
|
||||
|
||||
# Update targets before snapshot
|
||||
with self.assertRaises(RuntimeError):
|
||||
bundle.update_targets(data["targets"])
|
||||
|
||||
bundle.update_snapshot(data["snapshot"])
|
||||
|
||||
#update timestamp after snapshot
|
||||
with self.assertRaises(RuntimeError):
|
||||
bundle.update_timestamp(data["timestamp"])
|
||||
|
||||
# Update delegated targets before targets
|
||||
with self.assertRaises(RuntimeError):
|
||||
bundle.update_delegated_targets(data["role1"], "role1", "targets")
|
||||
|
||||
bundle.update_targets(data["targets"])
|
||||
bundle.update_delegated_targets(data["role1"], "role1", "targets")
|
||||
|
||||
def test_update_with_invalid_json(self):
|
||||
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
|
||||
data={}
|
||||
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
|
||||
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
|
||||
data[md] = f.read()
|
||||
|
||||
# root.json not a json file at all
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
MetadataBundle(b"")
|
||||
# root.json is invalid
|
||||
root = Metadata.from_bytes(data["root"])
|
||||
root.signed.version += 1
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
MetadataBundle(json.dumps(root.to_dict()).encode())
|
||||
|
||||
bundle = MetadataBundle(data["root"])
|
||||
bundle.root_update_finished()
|
||||
|
||||
top_level_md = [
|
||||
(data["timestamp"], bundle.update_timestamp),
|
||||
(data["snapshot"], bundle.update_snapshot),
|
||||
(data["targets"], bundle.update_targets),
|
||||
]
|
||||
for metadata, update_func in top_level_md:
|
||||
# metadata is not json
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
update_func(b"")
|
||||
# metadata is invalid
|
||||
md = Metadata.from_bytes(metadata)
|
||||
md.signed.version += 1
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
update_func(json.dumps(md.to_dict()).encode())
|
||||
|
||||
# metadata is of wrong type
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
update_func(data["root"])
|
||||
|
||||
update_func(metadata)
|
||||
|
||||
|
||||
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
|
||||
# TODO test the actual specification checks
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
utils.configure_test_logging(sys.argv)
|
||||
unittest.main()
|
||||
171
tests/test_updater_ng.py
Normal file
171
tests/test_updater_ng.py
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2021, New York University and the TUF contributors
|
||||
# SPDX-License-Identifier: MIT OR Apache-2.0
|
||||
|
||||
"""Test Updater class
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import shutil
|
||||
import copy
|
||||
import tempfile
|
||||
import logging
|
||||
import errno
|
||||
import sys
|
||||
import unittest
|
||||
import json
|
||||
import tracemalloc
|
||||
|
||||
if sys.version_info >= (3, 3):
|
||||
import unittest.mock as mock
|
||||
else:
|
||||
import mock
|
||||
|
||||
import tuf
|
||||
import tuf.exceptions
|
||||
import tuf.log
|
||||
import tuf.repository_tool as repo_tool
|
||||
import tuf.unittest_toolbox as unittest_toolbox
|
||||
|
||||
from tests import utils
|
||||
from tuf.api import metadata
|
||||
from tuf import ngclient
|
||||
|
||||
import securesystemslib
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestUpdater(unittest_toolbox.Modified_TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# Create a temporary directory to store the repository, metadata, and target
|
||||
# files. 'temporary_directory' must be deleted in TearDownModule() so that
|
||||
# temporary files are always removed, even when exceptions occur.
|
||||
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
||||
|
||||
# Needed because in some tests simple_server.py cannot be found.
|
||||
# The reason is that the current working directory
|
||||
# has been changed when executing a subprocess.
|
||||
cls.SIMPLE_SERVER_PATH = os.path.join(os.getcwd(), 'simple_server.py')
|
||||
|
||||
# Launch a SimpleHTTPServer (serves files in the current directory).
|
||||
# Test cases will request metadata and target files that have been
|
||||
# pre-generated in 'tuf/tests/repository_data', which will be served
|
||||
# by the SimpleHTTPServer launched here. The test cases of 'test_updater.py'
|
||||
# assume the pre-generated metadata files have a specific structure, such
|
||||
# as a delegated role 'targets/role1', three target files, five key files,
|
||||
# etc.
|
||||
cls.server_process_handler = utils.TestServerProcess(log=logger,
|
||||
server=cls.SIMPLE_SERVER_PATH)
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# Cleans the resources and flush the logged lines (if any).
|
||||
cls.server_process_handler.clean()
|
||||
|
||||
# Remove the temporary repository directory, which should contain all the
|
||||
# metadata, targets, and key files generated for the test cases
|
||||
shutil.rmtree(cls.temporary_directory)
|
||||
|
||||
|
||||
|
||||
def setUp(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.setUp(self)
|
||||
|
||||
# Copy the original repository files provided in the test folder so that
|
||||
# any modifications made to repository files are restricted to the copies.
|
||||
# The 'repository_data' directory is expected to exist in 'tuf.tests/'.
|
||||
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
|
||||
temporary_repository_root = \
|
||||
self.make_temp_directory(directory=self.temporary_directory)
|
||||
|
||||
# The original repository, keystore, and client directories will be copied
|
||||
# for each test case.
|
||||
original_repository = os.path.join(original_repository_files, 'repository')
|
||||
original_keystore = os.path.join(original_repository_files, 'keystore')
|
||||
original_client = os.path.join(original_repository_files, 'client', 'test_repository1', 'metadata', 'current')
|
||||
|
||||
# Save references to the often-needed client repository directories.
|
||||
# Test cases need these references to access metadata and target files.
|
||||
self.repository_directory = \
|
||||
os.path.join(temporary_repository_root, 'repository')
|
||||
self.keystore_directory = \
|
||||
os.path.join(temporary_repository_root, 'keystore')
|
||||
|
||||
self.client_directory = os.path.join(temporary_repository_root, 'client')
|
||||
|
||||
# Copy the original 'repository', 'client', and 'keystore' directories
|
||||
# to the temporary repository the test cases can use.
|
||||
shutil.copytree(original_repository, self.repository_directory)
|
||||
shutil.copytree(original_client, self.client_directory)
|
||||
shutil.copytree(original_keystore, self.keystore_directory)
|
||||
|
||||
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
|
||||
repository_basepath = self.repository_directory[len(os.getcwd()):]
|
||||
url_prefix = 'http://' + utils.TEST_HOST_ADDRESS + ':' \
|
||||
+ str(self.server_process_handler.port) + repository_basepath
|
||||
|
||||
metadata_url = f"{url_prefix}/metadata/"
|
||||
targets_url = f"{url_prefix}/targets/"
|
||||
# Creating a repository instance. The test cases will use this client
|
||||
# updater to refresh metadata, fetch target files, etc.
|
||||
self.repository_updater = ngclient.Updater(self.client_directory,
|
||||
metadata_url,
|
||||
targets_url)
|
||||
|
||||
def tearDown(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.tearDown(self)
|
||||
|
||||
# Logs stdout and stderr from the sever subprocess.
|
||||
self.server_process_handler.flush_log()
|
||||
|
||||
def test_refresh(self):
|
||||
# All metadata is in local directory already
|
||||
self.repository_updater.refresh()
|
||||
|
||||
# Get targetinfo for 'file1.txt' listed in targets
|
||||
targetinfo1 = self.repository_updater.get_one_valid_targetinfo('file1.txt')
|
||||
# Get targetinfo for 'file3.txt' listed in the delegated role1
|
||||
targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt')
|
||||
|
||||
destination_directory = self.make_temp_directory()
|
||||
updated_targets = self.repository_updater.updated_targets([targetinfo1, targetinfo3],
|
||||
destination_directory)
|
||||
|
||||
self.assertListEqual(updated_targets, [targetinfo1, targetinfo3])
|
||||
|
||||
self.repository_updater.download_target(targetinfo1, destination_directory)
|
||||
updated_targets = self.repository_updater.updated_targets(updated_targets,
|
||||
destination_directory)
|
||||
|
||||
self.assertListEqual(updated_targets, [targetinfo3])
|
||||
|
||||
|
||||
self.repository_updater.download_target(targetinfo3, destination_directory)
|
||||
updated_targets = self.repository_updater.updated_targets(updated_targets,
|
||||
destination_directory)
|
||||
|
||||
self.assertListEqual(updated_targets, [])
|
||||
|
||||
def test_refresh_with_only_local_root(self):
|
||||
os.remove(os.path.join(self.client_directory, "timestamp.json"))
|
||||
os.remove(os.path.join(self.client_directory, "snapshot.json"))
|
||||
os.remove(os.path.join(self.client_directory, "targets.json"))
|
||||
os.remove(os.path.join(self.client_directory, "role1.json"))
|
||||
|
||||
self.repository_updater.refresh()
|
||||
|
||||
# Get targetinfo for 'file3.txt' listed in the delegated role1
|
||||
targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt')
|
||||
|
||||
if __name__ == '__main__':
|
||||
utils.configure_test_logging(sys.argv)
|
||||
unittest.main()
|
||||
Loading…
Reference in a new issue