diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 675cce3e..79482e80 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -749,19 +749,17 @@ def test_write_metadata_file(self): root_signable = securesystemslib.util.load_json_file(root_filename) output_filename = os.path.join(temporary_directory, 'root.json') - compression_algorithms = ['gz'] version_number = root_signable['signed']['version'] + 1 self.assertFalse(os.path.exists(output_filename)) repo_lib.write_metadata_file(root_signable, output_filename, version_number, - compression_algorithms, consistent_snapshot=False) + consistent_snapshot=False) self.assertTrue(os.path.exists(output_filename)) - self.assertTrue(os.path.exists(output_filename + '.gz')) # Attempt to over-write the previously written metadata file. An exception # is not raised in this case, only a debug message is logged. repo_lib.write_metadata_file(root_signable, output_filename, version_number, - compression_algorithms, consistent_snapshot=False) + consistent_snapshot=False) # Try to write a consistent metadate file. An exception is not raised in # this case. For testing purposes, root.json should be a hard link to the @@ -769,7 +767,7 @@ def test_write_metadata_file(self): # the latest consistent files. tuf.settings.CONSISTENT_METHOD = 'hard_link' repo_lib.write_metadata_file(root_signable, output_filename, version_number, - compression_algorithms, consistent_snapshot=True) + consistent_snapshot=True) # Test if the consistent files are properly named # Filename format of a consistent file: .rolename.json @@ -780,9 +778,7 @@ def test_write_metadata_file(self): # Try to add more consistent metadata files. version_number += 1 repo_lib.write_metadata_file(root_signable, output_filename, - version_number, - compression_algorithms, - consistent_snapshot=True) + version_number, consistent_snapshot=True) # Test if the the latest root.json points to the expected consistent file # and consistent metadata do not all point to the same root.json @@ -796,76 +792,34 @@ def test_write_metadata_file(self): tuf.settings.CONSISTENT_METHOD = 'somebadidea' self.assertRaises(securesystemslib.exceptions.InvalidConfigurationError, repo_lib.write_metadata_file, root_signable, output_filename, - version_number, compression_algorithms, consistent_snapshot=True) + version_number, consistent_snapshot=True) # Try to create a link to root.json when root.json doesn't exist locally. # repository_lib should log a message if this is the case. tuf.settings.CONSISTENT_METHOD = 'hard_link' os.remove(output_filename) repo_lib.write_metadata_file(root_signable, output_filename, version_number, - compression_algorithms, consistent_snapshot=True) + consistent_snapshot=True) # Reset CONSISTENT_METHOD so that subsequent tests work as expected. tuf.settings.CONSISTENT_METHOD = 'copy' - # Test for unknown compression algorithm. - self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file, - root_signable, output_filename, version_number, compression_algorithms=['bad_algo'], - consistent_snapshot=False) - # Test improperly formatted arguments. self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file, - 3, output_filename, version_number, - compression_algorithms, False) + 3, output_filename, version_number, False) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file, - root_signable, 3, version_number, compression_algorithms, - False) + root_signable, 3, version_number, False) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file, - root_signable, output_filename, '3', - compression_algorithms, False) + root_signable, output_filename, '3', False) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file, - root_signable, output_filename, version_number, - compression_algorithms, 3) + root_signable, output_filename, version_number, 3) - def test__write_compressed_metadata(self): - # Test for invalid 'compressed_filename' argument and set - # 'write_new_metadata' to False. - file_object = securesystemslib.util.TempFile() - existing_filename = os.path.join('repository_data', 'repository', - 'metadata', 'root.json') - - write_new_metadata = False - repo_lib._write_compressed_metadata(file_object, - compressed_filename=existing_filename, - write_new_metadata=write_new_metadata, - consistent_snapshot=False, - version_number=8) - - # Test writing of compressed metadata when consistent snapshots is enabled. - file_object = securesystemslib.util.TempFile() - shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz')) - shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip')) - shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip')) - compressed_filename = os.path.join(self.temporary_directory, 'root.json.gz') - - # For testing purposes, add additional compression algorithms to - # repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS. - repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz', 'zip', 'bz2'] - repo_lib._write_compressed_metadata(file_object, - compressed_filename=compressed_filename, - write_new_metadata=True, - consistent_snapshot=True, - version_number=8) - repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz'] - - def test_create_tuf_client_directory(self): # Test normal case. temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) - repository_directory = os.path.join('repository_data', - 'repository') + repository_directory = os.path.join('repository_data', 'repository') client_directory = os.path.join(temporary_directory, 'client') repo_lib.create_tuf_client_directory(repository_directory, client_directory) @@ -881,15 +835,16 @@ def test_create_tuf_client_directory(self): # Test improperly formatted arguments. - self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.create_tuf_client_directory, - 3, client_directory) - self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.create_tuf_client_directory, - repository_directory, 3) + self.assertRaises(securesystemslib.exceptions.FormatError, + repo_lib.create_tuf_client_directory, 3, client_directory) + self.assertRaises(securesystemslib.exceptions.FormatError, + repo_lib.create_tuf_client_directory, repository_directory, 3) # Test invalid argument (i.e., client directory already exists.) - self.assertRaises(securesystemslib.exceptions.RepositoryError, repo_lib.create_tuf_client_directory, - repository_directory, client_directory) + self.assertRaises(securesystemslib.exceptions.RepositoryError, + repo_lib.create_tuf_client_directory, repository_directory, + client_directory) # Test invalid client metadata directory (i.e., non-errno.EEXIST exceptions # should be re-raised.) @@ -903,7 +858,7 @@ def test_create_tuf_client_directory(self): os.chmod(client_directory, current_client_directory_mode & ~stat.S_IWUSR) self.assertRaises(OSError, repo_lib.create_tuf_client_directory, - repository_directory, client_directory) + repository_directory, client_directory) # Reset the client directory's mode. os.chmod(client_directory, current_client_directory_mode) @@ -912,7 +867,8 @@ def test_create_tuf_client_directory(self): def test__check_directory(self): # Test for non-existent directory. - self.assertRaises(securesystemslib.exceptions.Error, repo_lib._check_directory, 'non-existent') + self.assertRaises(securesystemslib.exceptions.Error, + repo_lib._check_directory, 'non-existent') @@ -954,8 +910,7 @@ def test__generate_and_write_metadata(self): repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata, targets_directory, metadata_directory, consistent_snapshot=False, - filenames=None, compression_algorithms=['gz'], - repository_name=repository_name) + filenames=None, repository_name=repository_name) snapshot_filepath = os.path.join('repository_data', 'repository', 'metadata', 'snapshot.json') @@ -1022,20 +977,13 @@ def test__load_top_level_metadata(self): signable = securesystemslib.util.load_json_file(os.path.join(metadata_directory, 'root.json')) signable['signatures'].append(signable['signatures'][0]) - repo_lib.write_metadata_file(signable, root_file, 8, ['gz'], False) + repo_lib.write_metadata_file(signable, root_file, 8, False) # Attempt to load a repository that contains a compressed Root file. repository = repo_tool.create_new_repository(repository_directory, repository_name) filenames = repo_lib.get_metadata_filenames(metadata_directory) repo_lib._load_top_level_metadata(repository, filenames, repository_name) - # Remove compressed metadata so that we can test for loading of a - # repository with no compression enabled. - for role_file in os.listdir(metadata_directory): - if role_file.endswith('.json.gz'): - role_filename = os.path.join(metadata_directory, role_file) - os.remove(role_filename) - filenames = repo_lib.get_metadata_filenames(metadata_directory) repository = repo_tool.create_new_repository(repository_directory, repository_name) repo_lib._load_top_level_metadata(repository, filenames, repository_name) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index c189dbf5..703535de 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -96,9 +96,6 @@ # Supported key types. SUPPORTED_KEY_TYPES = ['rsa', 'ed25519'] -# The recognized compression extensions. -SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz'] - # The full list of supported TUF metadata extensions. METADATA_EXTENSIONS = ['.json.gz', '.json'] @@ -107,12 +104,9 @@ def _generate_and_write_metadata(rolename, metadata_filename, - targets_directory, metadata_directory, - consistent_snapshot=False, filenames=None, - compression_algorithms=['gz'], - allow_partially_signed=False, - increment_version_number=True, - repository_name='default'): + targets_directory, metadata_directory, consistent_snapshot=False, + filenames=None, allow_partially_signed=False, increment_version_number=True, + repository_name='default'): """ Non-public function that can generate and write the metadata for the specified 'rolename'. It also increments the version number of 'rolename' if @@ -131,7 +125,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, # Generate the appropriate role metadata for 'rolename'. if rolename == 'root': metadata = generate_root_metadata(roleinfo['version'], roleinfo['expires'], - consistent_snapshot, compression_algorithms, repository_name) + consistent_snapshot, repository_name) _log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'], ROOT_EXPIRES_WARN_SECONDS) @@ -221,7 +215,7 @@ def should_write(): if rolename == 'root': consistent_snapshot = True filename = write_metadata_file(signable, metadata_filename, - metadata['version'], compression_algorithms, consistent_snapshot) + metadata['version'], consistent_snapshot) # 'signable' contains an invalid threshold of signatures. else: @@ -247,13 +241,11 @@ def should_write(): # .root.json and root.json). if rolename == 'root': filename = write_metadata_file(signable, metadata_filename, - metadata['version'], compression_algorithms, - consistent_snapshot=True) + metadata['version'], consistent_snapshot=True) else: filename = write_metadata_file(signable, metadata_filename, - metadata['version'], compression_algorithms, - consistent_snapshot) + metadata['version'], consistent_snapshot) return signable, filename @@ -580,9 +572,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): logger.debug('Found a Root signature that is already loaded:' ' ' + repr(signature)) - if os.path.exists(root_filename + '.gz'): - roleinfo['compressions'].append('gz') - else: logger.debug('A compressed Root file was not found.') @@ -620,11 +609,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): roleinfo = tuf.roledb.get_roleinfo('timestamp', repository_name) roleinfo['expires'] = timestamp_metadata['expires'] roleinfo['version'] = timestamp_metadata['version'] - if os.path.exists(timestamp_filename + '.gz'): - roleinfo['compressions'].append('gz') - - else: - logger.debug('A compressed Timestamp file was not found.') if _metadata_is_partially_loaded('timestamp', signable, roleinfo, repository_name): roleinfo['partial_loaded'] = True @@ -665,11 +649,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): roleinfo = tuf.roledb.get_roleinfo('snapshot', repository_name) roleinfo['expires'] = snapshot_metadata['expires'] roleinfo['version'] = snapshot_metadata['version'] - if os.path.exists(snapshot_filename + '.gz'): - roleinfo['compressions'].append('gz') - - else: - logger.debug('A compressed Snapshot file was not loaded.') if _metadata_is_partially_loaded('snapshot', signable, roleinfo, repository_name): roleinfo['partial_loaded'] = True @@ -708,11 +687,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): roleinfo['version'] = targets_metadata['version'] roleinfo['expires'] = targets_metadata['expires'] roleinfo['delegations'] = targets_metadata['delegations'] - if os.path.exists(targets_filename + '.gz'): - roleinfo['compressions'].append('gz') - - else: - logger.debug('Compressed Targets file cannot be loaded.') if _metadata_is_partially_loaded('targets', signable, roleinfo, repository_name): roleinfo['partial_loaded'] = True @@ -1226,7 +1200,7 @@ def get_target_hash(target_filepath): def generate_root_metadata(version, expiration_date, consistent_snapshot, - compression_algorithms=['gz'], repository_name='default'): + repository_name='default'): """ Create the root metadata. 'tuf.roledb.py' and 'tuf.keydb.py' @@ -1248,11 +1222,6 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, filename of any target file located in the targets directory. Each digest is stripped from the target filename and listed in the snapshot metadata. - compression_algorithms: - A list of compression algorithms to use when generating the compressed - metadata files for the repository. The root file specifies the - algorithms used by the repository. - repository_name: The name of the repository. If not supplied, 'rolename' is added to the 'default' repository. @@ -1280,7 +1249,6 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, tuf.formats.METADATAVERSION_SCHEMA.check_match(version) securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date) securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot) - tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms) securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) # The role and key dictionaries to be saved in the root metadata object. @@ -1343,9 +1311,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, # Generate the root metadata object. root_metadata = tuf.formats.RootFile.make_metadata(version, expiration_date, - keydict, roledict, - consistent_snapshot, - compression_algorithms) + keydict, roledict, consistent_snapshot) return root_metadata @@ -1754,16 +1720,10 @@ def sign_metadata(metadata_object, keyids, filename, repository_name): -def write_metadata_file(metadata, filename, version_number, - compression_algorithms, consistent_snapshot): +def write_metadata_file(metadata, filename, version_number, consistent_snapshot): """ - If necessary, write the 'metadata' signable object to 'filename', and the - compressed version of the metadata file if 'compression' is set. - - Note: Compression algorithms like gzip attach a timestamp to compressed - files, so a metadata file compressed multiple times may generate different - digests even though the uncompressed content has not changed. + If necessary, write the 'metadata' signable object to 'filename'. metadata: @@ -1772,27 +1732,22 @@ def write_metadata_file(metadata, filename, version_number, filename: The filename of the metadata to be written (e.g., 'root.json'). - If a compression algorithm is specified in 'compression_algorithms', the - compression extention is appended to 'filename'. version_number: The version number of the metadata file to be written. The version number is needed for consistent snapshots, which prepend the version number to 'filename'. - compression_algorithms: - Specify the algorithms, as a list of strings, used to compress the - 'metadata'; The only currently available compression option is 'gz' - (gzip). - consistent_snapshot: Boolean that determines whether the metadata file's digest should be prepended to the filename. - securesystemslib.exceptions.FormatError, if the arguments are improperly formatted. + securesystemslib.exceptions.FormatError, if the arguments are improperly + formatted. - securesystemslib.exceptions.Error, if the directory of 'filename' does not exist. + securesystemslib.exceptions.Error, if the directory of 'filename' does not + exist. Any other runtime (e.g., IO) exception. @@ -1811,7 +1766,6 @@ def write_metadata_file(metadata, filename, version_number, tuf.formats.SIGNABLE_SCHEMA.check_match(metadata) securesystemslib.formats.PATH_SCHEMA.check_match(filename) tuf.formats.METADATAVERSION_SCHEMA.check_match(version_number) - tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms) securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot) # Verify the directory of 'filename', and convert 'filename' to its absolute @@ -1878,115 +1832,21 @@ def write_metadata_file(metadata, filename, version_number, os.link(written_consistent_filename, written_filename) else: - raise securesystemslib.exceptions.InvalidConfigurationError('The consistent method specified' - ' in tuf.settings.py is not supported, try either "copy" or "hard_link"') + raise securesystemslib.exceptions.InvalidConfigurationError('The' + ' consistent method specified in tuf.settings.py is not supported, try' + ' either "copy" or "hard_link"') else: logger.debug('Not creating a consistent snapshot for ' + repr(written_filename)) logger.debug('Saving ' + repr(written_filename)) file_object.move(written_filename) - # Generate the compressed versions of 'metadata', if necessary. A compressed - # file may be written (without needing to write the uncompressed version) if - # the repository maintainer adds compression after writing the uncompressed - # version. - for compression_algorithm in compression_algorithms: - file_object = None - - # Ignore the empty string that signifies non-compression. The uncompressed - # file was previously written above, if necessary. - if not len(compression_algorithm): - continue - - elif compression_algorithm == 'gz': - file_object = securesystemslib.util.TempFile() - compressed_filename = filename + '.gz' - - # Instantiate a gzip object, but save compressed content to - # 'file_object' (i.e., GzipFile instance is based on its 'fileobj' - # argument). - gzip_object = gzip.GzipFile(fileobj=file_object, mode='wb') - try: - gzip_object.write(file_content) - - finally: - gzip_object.close() - - # This else clause should not be reached because the - # 'compression_algorithms' list is validated against the - # COMPRESSIONS_SCHEMA above. - else: # pragma: no cover - raise securesystemslib.exceptions.FormatError('Unknown compression algorithm:' - ' ' + repr(compression_algorithm)) - - # Save the compressed version, ensuring an unchanged file is not re-saved. - # Re-saving the same compressed version may cause its digest to - # unexpectedly change (gzip includes a timestamp) even though content has - # not changed. - _write_compressed_metadata(file_object, compressed_filename, - True, consistent_snapshot, - version_number) return written_filename -def _write_compressed_metadata(file_object, compressed_filename, - write_new_metadata, consistent_snapshot, version_number): - """ - Write compressed versions of metadata, ensuring compressed file that have - not changed are not re-written, the digest of the compressed file is properly - added to the compressed filename, and consistent snapshots are also saved. - Ensure compressed files are written to a temporary location, and then - moved to their destinations. - """ - - # If a consistent snapshot is unneeded, 'file_object' may be simply moved - # 'compressed_filename' if not already written. - if not consistent_snapshot: - if write_new_metadata or not os.path.exists(compressed_filename): - file_object.move(compressed_filename) - - # The temporary file must be closed if 'file_object.move()' is not used. - # securesystemslib.util.TempFile() automatically closes the temp file when move() is - # called - else: - file_object.close_temp_file() - - # consistent snapshots = True. Ensure the version number is included in the - # compressed filename written, provided it does not already exist. - else: - compressed_content = file_object.read() - consistent_filename = None - version_and_filename = None - - # Attach the version number to the compressed, consistent snapshot filename. - dirname, basename = os.path.split(compressed_filename) - - for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS: - if basename.endswith(compression_extension): - basename = basename.split(compression_extension, 1)[0] - version_and_filename = str(version_number) + '.' + basename + compression_extension - consistent_filename = os.path.join(dirname, version_and_filename) - - else: - logger.debug('Skipping compression extension: ' + repr(compression_extension)) - - # Move the 'securesystemslib.util.TempFile' object to one of the filenames so that it is - # saved and the temporary file closed. - if not os.path.exists(consistent_filename): - logger.debug('Saving ' + repr(consistent_filename)) - file_object.move(consistent_filename) - - else: - logger.debug('Skipping already written compressed file:' - ' ' + repr(consistent_filename)) - - - - - def _log_status_of_top_level_roles(targets_directory, metadata_directory, repository_name): """