diff --git a/tests/test_metadata_bundle.py b/tests/test_metadata_bundle.py new file mode 100644 index 00000000..a988b8d3 --- /dev/null +++ b/tests/test_metadata_bundle.py @@ -0,0 +1,124 @@ +import json +import logging +import os +import shutil +import sys +import tempfile +import unittest + +from tuf import exceptions +from tuf.api.metadata import Metadata +from tuf.ngclient._internal.metadata_bundle import MetadataBundle + +from tests import utils + +logger = logging.getLogger(__name__) + +class TestMetadataBundle(unittest.TestCase): + + def test_update(self): + repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') + + with open(os.path.join(repo_dir, "root.json"), "rb") as f: + bundle = MetadataBundle(f.read()) + bundle.root_update_finished() + + with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f: + bundle.update_timestamp(f.read()) + with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f: + bundle.update_snapshot(f.read()) + with open(os.path.join(repo_dir, "targets.json"), "rb") as f: + bundle.update_targets(f.read()) + with open(os.path.join(repo_dir, "role1.json"), "rb") as f: + bundle.update_delegated_targets(f.read(), "role1", "targets") + with open(os.path.join(repo_dir, "role2.json"), "rb") as f: + bundle.update_delegated_targets(f.read(), "role2", "role1") + + def test_out_of_order_ops(self): + repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') + data={} + for md in ["root", "timestamp", "snapshot", "targets", "role1"]: + with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: + data[md] = f.read() + + bundle = MetadataBundle(data["root"]) + + # Update timestamp before root is finished + with self.assertRaises(RuntimeError): + bundle.update_timestamp(data["timestamp"]) + + bundle.root_update_finished() + with self.assertRaises(RuntimeError): + bundle.root_update_finished() + + # Update snapshot before timestamp + with self.assertRaises(RuntimeError): + bundle.update_snapshot(data["snapshot"]) + + bundle.update_timestamp(data["timestamp"]) + + # Update targets before snapshot + with self.assertRaises(RuntimeError): + bundle.update_targets(data["targets"]) + + bundle.update_snapshot(data["snapshot"]) + + #update timestamp after snapshot + with self.assertRaises(RuntimeError): + bundle.update_timestamp(data["timestamp"]) + + # Update delegated targets before targets + with self.assertRaises(RuntimeError): + bundle.update_delegated_targets(data["role1"], "role1", "targets") + + bundle.update_targets(data["targets"]) + bundle.update_delegated_targets(data["role1"], "role1", "targets") + + def test_update_with_invalid_json(self): + repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') + data={} + for md in ["root", "timestamp", "snapshot", "targets", "role1"]: + with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: + data[md] = f.read() + + # root.json not a json file at all + with self.assertRaises(exceptions.RepositoryError): + MetadataBundle(b"") + # root.json is invalid + root = Metadata.from_bytes(data["root"]) + root.signed.version += 1 + with self.assertRaises(exceptions.RepositoryError): + MetadataBundle(json.dumps(root.to_dict()).encode()) + + bundle = MetadataBundle(data["root"]) + bundle.root_update_finished() + + top_level_md = [ + (data["timestamp"], bundle.update_timestamp), + (data["snapshot"], bundle.update_snapshot), + (data["targets"], bundle.update_targets), + ] + for metadata, update_func in top_level_md: + # metadata is not json + with self.assertRaises(exceptions.RepositoryError): + update_func(b"") + # metadata is invalid + md = Metadata.from_bytes(metadata) + md.signed.version += 1 + with self.assertRaises(exceptions.RepositoryError): + update_func(json.dumps(md.to_dict()).encode()) + + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + update_func(data["root"]) + + update_func(metadata) + + + # TODO test updating over initial metadata (new keys, newer timestamp, etc) + # TODO test the actual specification checks + + +if __name__ == '__main__': + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py new file mode 100644 index 00000000..44dae1d0 --- /dev/null +++ b/tests/test_updater_ng.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test Updater class +""" + +import os +import time +import shutil +import copy +import tempfile +import logging +import errno +import sys +import unittest +import json +import tracemalloc + +if sys.version_info >= (3, 3): + import unittest.mock as mock +else: + import mock + +import tuf +import tuf.exceptions +import tuf.log +import tuf.repository_tool as repo_tool +import tuf.unittest_toolbox as unittest_toolbox + +from tests import utils +from tuf.api import metadata +from tuf import ngclient + +import securesystemslib + +logger = logging.getLogger(__name__) + + +class TestUpdater(unittest_toolbox.Modified_TestCase): + + @classmethod + def setUpClass(cls): + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownModule() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + # Needed because in some tests simple_server.py cannot be found. + # The reason is that the current working directory + # has been changed when executing a subprocess. + cls.SIMPLE_SERVER_PATH = os.path.join(os.getcwd(), 'simple_server.py') + + # Launch a SimpleHTTPServer (serves files in the current directory). + # Test cases will request metadata and target files that have been + # pre-generated in 'tuf/tests/repository_data', which will be served + # by the SimpleHTTPServer launched here. The test cases of 'test_updater.py' + # assume the pre-generated metadata files have a specific structure, such + # as a delegated role 'targets/role1', three target files, five key files, + # etc. + cls.server_process_handler = utils.TestServerProcess(log=logger, + server=cls.SIMPLE_SERVER_PATH) + + + + @classmethod + def tearDownClass(cls): + # Cleans the resources and flush the logged lines (if any). + cls.server_process_handler.clean() + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases + shutil.rmtree(cls.temporary_directory) + + + + def setUp(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.setUp(self) + + # Copy the original repository files provided in the test folder so that + # any modifications made to repository files are restricted to the copies. + # The 'repository_data' directory is expected to exist in 'tuf.tests/'. + original_repository_files = os.path.join(os.getcwd(), 'repository_data') + temporary_repository_root = \ + self.make_temp_directory(directory=self.temporary_directory) + + # The original repository, keystore, and client directories will be copied + # for each test case. + original_repository = os.path.join(original_repository_files, 'repository') + original_keystore = os.path.join(original_repository_files, 'keystore') + original_client = os.path.join(original_repository_files, 'client', 'test_repository1', 'metadata', 'current') + + # Save references to the often-needed client repository directories. + # Test cases need these references to access metadata and target files. + self.repository_directory = \ + os.path.join(temporary_repository_root, 'repository') + self.keystore_directory = \ + os.path.join(temporary_repository_root, 'keystore') + + self.client_directory = os.path.join(temporary_repository_root, 'client') + + # Copy the original 'repository', 'client', and 'keystore' directories + # to the temporary repository the test cases can use. + shutil.copytree(original_repository, self.repository_directory) + shutil.copytree(original_client, self.client_directory) + shutil.copytree(original_keystore, self.keystore_directory) + + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + repository_basepath = self.repository_directory[len(os.getcwd()):] + url_prefix = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ + + str(self.server_process_handler.port) + repository_basepath + + metadata_url = f"{url_prefix}/metadata/" + targets_url = f"{url_prefix}/targets/" + # Creating a repository instance. The test cases will use this client + # updater to refresh metadata, fetch target files, etc. + self.repository_updater = ngclient.Updater(self.client_directory, + metadata_url, + targets_url) + + def tearDown(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.tearDown(self) + + # Logs stdout and stderr from the sever subprocess. + self.server_process_handler.flush_log() + + def test_refresh(self): + # All metadata is in local directory already + self.repository_updater.refresh() + + # Get targetinfo for 'file1.txt' listed in targets + targetinfo1 = self.repository_updater.get_one_valid_targetinfo('file1.txt') + # Get targetinfo for 'file3.txt' listed in the delegated role1 + targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt') + + destination_directory = self.make_temp_directory() + updated_targets = self.repository_updater.updated_targets([targetinfo1, targetinfo3], + destination_directory) + + self.assertListEqual(updated_targets, [targetinfo1, targetinfo3]) + + self.repository_updater.download_target(targetinfo1, destination_directory) + updated_targets = self.repository_updater.updated_targets(updated_targets, + destination_directory) + + self.assertListEqual(updated_targets, [targetinfo3]) + + + self.repository_updater.download_target(targetinfo3, destination_directory) + updated_targets = self.repository_updater.updated_targets(updated_targets, + destination_directory) + + self.assertListEqual(updated_targets, []) + + def test_refresh_with_only_local_root(self): + os.remove(os.path.join(self.client_directory, "timestamp.json")) + os.remove(os.path.join(self.client_directory, "snapshot.json")) + os.remove(os.path.join(self.client_directory, "targets.json")) + os.remove(os.path.join(self.client_directory, "role1.json")) + + self.repository_updater.refresh() + + # Get targetinfo for 'file3.txt' listed in the delegated role1 + targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt') + +if __name__ == '__main__': + utils.configure_test_logging(sys.argv) + unittest.main()