mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Remove compression logic from repository_lib.py and edit its unit tests
This commit is contained in:
parent
9fe00756ef
commit
60cd55229f
2 changed files with 41 additions and 233 deletions
|
|
@ -749,19 +749,17 @@ def test_write_metadata_file(self):
|
|||
root_signable = securesystemslib.util.load_json_file(root_filename)
|
||||
|
||||
output_filename = os.path.join(temporary_directory, 'root.json')
|
||||
compression_algorithms = ['gz']
|
||||
version_number = root_signable['signed']['version'] + 1
|
||||
|
||||
self.assertFalse(os.path.exists(output_filename))
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=False)
|
||||
consistent_snapshot=False)
|
||||
self.assertTrue(os.path.exists(output_filename))
|
||||
self.assertTrue(os.path.exists(output_filename + '.gz'))
|
||||
|
||||
# Attempt to over-write the previously written metadata file. An exception
|
||||
# is not raised in this case, only a debug message is logged.
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=False)
|
||||
consistent_snapshot=False)
|
||||
|
||||
# Try to write a consistent metadate file. An exception is not raised in
|
||||
# this case. For testing purposes, root.json should be a hard link to the
|
||||
|
|
@ -769,7 +767,7 @@ def test_write_metadata_file(self):
|
|||
# the latest consistent files.
|
||||
tuf.settings.CONSISTENT_METHOD = 'hard_link'
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=True)
|
||||
consistent_snapshot=True)
|
||||
|
||||
# Test if the consistent files are properly named
|
||||
# Filename format of a consistent file: <version number>.rolename.json
|
||||
|
|
@ -780,9 +778,7 @@ def test_write_metadata_file(self):
|
|||
# Try to add more consistent metadata files.
|
||||
version_number += 1
|
||||
repo_lib.write_metadata_file(root_signable, output_filename,
|
||||
version_number,
|
||||
compression_algorithms,
|
||||
consistent_snapshot=True)
|
||||
version_number, consistent_snapshot=True)
|
||||
|
||||
# Test if the the latest root.json points to the expected consistent file
|
||||
# and consistent metadata do not all point to the same root.json
|
||||
|
|
@ -796,76 +792,34 @@ def test_write_metadata_file(self):
|
|||
tuf.settings.CONSISTENT_METHOD = 'somebadidea'
|
||||
self.assertRaises(securesystemslib.exceptions.InvalidConfigurationError,
|
||||
repo_lib.write_metadata_file, root_signable, output_filename,
|
||||
version_number, compression_algorithms, consistent_snapshot=True)
|
||||
version_number, consistent_snapshot=True)
|
||||
|
||||
# Try to create a link to root.json when root.json doesn't exist locally.
|
||||
# repository_lib should log a message if this is the case.
|
||||
tuf.settings.CONSISTENT_METHOD = 'hard_link'
|
||||
os.remove(output_filename)
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=True)
|
||||
consistent_snapshot=True)
|
||||
|
||||
# Reset CONSISTENT_METHOD so that subsequent tests work as expected.
|
||||
tuf.settings.CONSISTENT_METHOD = 'copy'
|
||||
|
||||
# Test for unknown compression algorithm.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, output_filename, version_number, compression_algorithms=['bad_algo'],
|
||||
consistent_snapshot=False)
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
3, output_filename, version_number,
|
||||
compression_algorithms, False)
|
||||
3, output_filename, version_number, False)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, 3, version_number, compression_algorithms,
|
||||
False)
|
||||
root_signable, 3, version_number, False)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, output_filename, '3',
|
||||
compression_algorithms, False)
|
||||
root_signable, output_filename, '3', False)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, output_filename, version_number,
|
||||
compression_algorithms, 3)
|
||||
root_signable, output_filename, version_number, 3)
|
||||
|
||||
|
||||
|
||||
def test__write_compressed_metadata(self):
|
||||
# Test for invalid 'compressed_filename' argument and set
|
||||
# 'write_new_metadata' to False.
|
||||
file_object = securesystemslib.util.TempFile()
|
||||
existing_filename = os.path.join('repository_data', 'repository',
|
||||
'metadata', 'root.json')
|
||||
|
||||
write_new_metadata = False
|
||||
repo_lib._write_compressed_metadata(file_object,
|
||||
compressed_filename=existing_filename,
|
||||
write_new_metadata=write_new_metadata,
|
||||
consistent_snapshot=False,
|
||||
version_number=8)
|
||||
|
||||
# Test writing of compressed metadata when consistent snapshots is enabled.
|
||||
file_object = securesystemslib.util.TempFile()
|
||||
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz'))
|
||||
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip'))
|
||||
shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip'))
|
||||
compressed_filename = os.path.join(self.temporary_directory, 'root.json.gz')
|
||||
|
||||
# For testing purposes, add additional compression algorithms to
|
||||
# repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS.
|
||||
repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz', 'zip', 'bz2']
|
||||
repo_lib._write_compressed_metadata(file_object,
|
||||
compressed_filename=compressed_filename,
|
||||
write_new_metadata=True,
|
||||
consistent_snapshot=True,
|
||||
version_number=8)
|
||||
repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz']
|
||||
|
||||
|
||||
def test_create_tuf_client_directory(self):
|
||||
# Test normal case.
|
||||
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
||||
repository_directory = os.path.join('repository_data',
|
||||
'repository')
|
||||
repository_directory = os.path.join('repository_data', 'repository')
|
||||
client_directory = os.path.join(temporary_directory, 'client')
|
||||
|
||||
repo_lib.create_tuf_client_directory(repository_directory, client_directory)
|
||||
|
|
@ -881,15 +835,16 @@ def test_create_tuf_client_directory(self):
|
|||
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.create_tuf_client_directory,
|
||||
3, client_directory)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.create_tuf_client_directory,
|
||||
repository_directory, 3)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
repo_lib.create_tuf_client_directory, 3, client_directory)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
repo_lib.create_tuf_client_directory, repository_directory, 3)
|
||||
|
||||
|
||||
# Test invalid argument (i.e., client directory already exists.)
|
||||
self.assertRaises(securesystemslib.exceptions.RepositoryError, repo_lib.create_tuf_client_directory,
|
||||
repository_directory, client_directory)
|
||||
self.assertRaises(securesystemslib.exceptions.RepositoryError,
|
||||
repo_lib.create_tuf_client_directory, repository_directory,
|
||||
client_directory)
|
||||
|
||||
# Test invalid client metadata directory (i.e., non-errno.EEXIST exceptions
|
||||
# should be re-raised.)
|
||||
|
|
@ -903,7 +858,7 @@ def test_create_tuf_client_directory(self):
|
|||
os.chmod(client_directory, current_client_directory_mode & ~stat.S_IWUSR)
|
||||
|
||||
self.assertRaises(OSError, repo_lib.create_tuf_client_directory,
|
||||
repository_directory, client_directory)
|
||||
repository_directory, client_directory)
|
||||
|
||||
# Reset the client directory's mode.
|
||||
os.chmod(client_directory, current_client_directory_mode)
|
||||
|
|
@ -912,7 +867,8 @@ def test_create_tuf_client_directory(self):
|
|||
|
||||
def test__check_directory(self):
|
||||
# Test for non-existent directory.
|
||||
self.assertRaises(securesystemslib.exceptions.Error, repo_lib._check_directory, 'non-existent')
|
||||
self.assertRaises(securesystemslib.exceptions.Error,
|
||||
repo_lib._check_directory, 'non-existent')
|
||||
|
||||
|
||||
|
||||
|
|
@ -954,8 +910,7 @@ def test__generate_and_write_metadata(self):
|
|||
|
||||
repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata,
|
||||
targets_directory, metadata_directory, consistent_snapshot=False,
|
||||
filenames=None, compression_algorithms=['gz'],
|
||||
repository_name=repository_name)
|
||||
filenames=None, repository_name=repository_name)
|
||||
|
||||
snapshot_filepath = os.path.join('repository_data', 'repository',
|
||||
'metadata', 'snapshot.json')
|
||||
|
|
@ -1022,20 +977,13 @@ def test__load_top_level_metadata(self):
|
|||
signable = securesystemslib.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
|
||||
signable['signatures'].append(signable['signatures'][0])
|
||||
|
||||
repo_lib.write_metadata_file(signable, root_file, 8, ['gz'], False)
|
||||
repo_lib.write_metadata_file(signable, root_file, 8, False)
|
||||
|
||||
# Attempt to load a repository that contains a compressed Root file.
|
||||
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
||||
filenames = repo_lib.get_metadata_filenames(metadata_directory)
|
||||
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
||||
|
||||
# Remove compressed metadata so that we can test for loading of a
|
||||
# repository with no compression enabled.
|
||||
for role_file in os.listdir(metadata_directory):
|
||||
if role_file.endswith('.json.gz'):
|
||||
role_filename = os.path.join(metadata_directory, role_file)
|
||||
os.remove(role_filename)
|
||||
|
||||
filenames = repo_lib.get_metadata_filenames(metadata_directory)
|
||||
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
||||
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
||||
|
|
|
|||
|
|
@ -96,9 +96,6 @@
|
|||
# Supported key types.
|
||||
SUPPORTED_KEY_TYPES = ['rsa', 'ed25519']
|
||||
|
||||
# The recognized compression extensions.
|
||||
SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz']
|
||||
|
||||
# The full list of supported TUF metadata extensions.
|
||||
METADATA_EXTENSIONS = ['.json.gz', '.json']
|
||||
|
||||
|
|
@ -107,12 +104,9 @@
|
|||
|
||||
|
||||
def _generate_and_write_metadata(rolename, metadata_filename,
|
||||
targets_directory, metadata_directory,
|
||||
consistent_snapshot=False, filenames=None,
|
||||
compression_algorithms=['gz'],
|
||||
allow_partially_signed=False,
|
||||
increment_version_number=True,
|
||||
repository_name='default'):
|
||||
targets_directory, metadata_directory, consistent_snapshot=False,
|
||||
filenames=None, allow_partially_signed=False, increment_version_number=True,
|
||||
repository_name='default'):
|
||||
"""
|
||||
Non-public function that can generate and write the metadata for the
|
||||
specified 'rolename'. It also increments the version number of 'rolename' if
|
||||
|
|
@ -131,7 +125,7 @@ def _generate_and_write_metadata(rolename, metadata_filename,
|
|||
# Generate the appropriate role metadata for 'rolename'.
|
||||
if rolename == 'root':
|
||||
metadata = generate_root_metadata(roleinfo['version'], roleinfo['expires'],
|
||||
consistent_snapshot, compression_algorithms, repository_name)
|
||||
consistent_snapshot, repository_name)
|
||||
|
||||
_log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'],
|
||||
ROOT_EXPIRES_WARN_SECONDS)
|
||||
|
|
@ -221,7 +215,7 @@ def should_write():
|
|||
if rolename == 'root':
|
||||
consistent_snapshot = True
|
||||
filename = write_metadata_file(signable, metadata_filename,
|
||||
metadata['version'], compression_algorithms, consistent_snapshot)
|
||||
metadata['version'], consistent_snapshot)
|
||||
|
||||
# 'signable' contains an invalid threshold of signatures.
|
||||
else:
|
||||
|
|
@ -247,13 +241,11 @@ def should_write():
|
|||
# <version>.root.json and root.json).
|
||||
if rolename == 'root':
|
||||
filename = write_metadata_file(signable, metadata_filename,
|
||||
metadata['version'], compression_algorithms,
|
||||
consistent_snapshot=True)
|
||||
metadata['version'], consistent_snapshot=True)
|
||||
|
||||
else:
|
||||
filename = write_metadata_file(signable, metadata_filename,
|
||||
metadata['version'], compression_algorithms,
|
||||
consistent_snapshot)
|
||||
metadata['version'], consistent_snapshot)
|
||||
|
||||
return signable, filename
|
||||
|
||||
|
|
@ -580,9 +572,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
logger.debug('Found a Root signature that is already loaded:'
|
||||
' ' + repr(signature))
|
||||
|
||||
if os.path.exists(root_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('A compressed Root file was not found.')
|
||||
|
||||
|
|
@ -620,11 +609,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
roleinfo = tuf.roledb.get_roleinfo('timestamp', repository_name)
|
||||
roleinfo['expires'] = timestamp_metadata['expires']
|
||||
roleinfo['version'] = timestamp_metadata['version']
|
||||
if os.path.exists(timestamp_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('A compressed Timestamp file was not found.')
|
||||
|
||||
if _metadata_is_partially_loaded('timestamp', signable, roleinfo, repository_name):
|
||||
roleinfo['partial_loaded'] = True
|
||||
|
|
@ -665,11 +649,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
roleinfo = tuf.roledb.get_roleinfo('snapshot', repository_name)
|
||||
roleinfo['expires'] = snapshot_metadata['expires']
|
||||
roleinfo['version'] = snapshot_metadata['version']
|
||||
if os.path.exists(snapshot_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('A compressed Snapshot file was not loaded.')
|
||||
|
||||
if _metadata_is_partially_loaded('snapshot', signable, roleinfo, repository_name):
|
||||
roleinfo['partial_loaded'] = True
|
||||
|
|
@ -708,11 +687,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
roleinfo['version'] = targets_metadata['version']
|
||||
roleinfo['expires'] = targets_metadata['expires']
|
||||
roleinfo['delegations'] = targets_metadata['delegations']
|
||||
if os.path.exists(targets_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('Compressed Targets file cannot be loaded.')
|
||||
|
||||
if _metadata_is_partially_loaded('targets', signable, roleinfo, repository_name):
|
||||
roleinfo['partial_loaded'] = True
|
||||
|
|
@ -1226,7 +1200,7 @@ def get_target_hash(target_filepath):
|
|||
|
||||
|
||||
def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
||||
compression_algorithms=['gz'], repository_name='default'):
|
||||
repository_name='default'):
|
||||
"""
|
||||
<Purpose>
|
||||
Create the root metadata. 'tuf.roledb.py' and 'tuf.keydb.py'
|
||||
|
|
@ -1248,11 +1222,6 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
|||
filename of any target file located in the targets directory. Each digest
|
||||
is stripped from the target filename and listed in the snapshot metadata.
|
||||
|
||||
compression_algorithms:
|
||||
A list of compression algorithms to use when generating the compressed
|
||||
metadata files for the repository. The root file specifies the
|
||||
algorithms used by the repository.
|
||||
|
||||
repository_name:
|
||||
The name of the repository. If not supplied, 'rolename' is added to the
|
||||
'default' repository.
|
||||
|
|
@ -1280,7 +1249,6 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
|||
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
|
||||
securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
|
||||
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
|
||||
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
|
||||
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
|
||||
|
||||
# The role and key dictionaries to be saved in the root metadata object.
|
||||
|
|
@ -1343,9 +1311,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
|||
|
||||
# Generate the root metadata object.
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version, expiration_date,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
keydict, roledict, consistent_snapshot)
|
||||
|
||||
return root_metadata
|
||||
|
||||
|
|
@ -1754,16 +1720,10 @@ def sign_metadata(metadata_object, keyids, filename, repository_name):
|
|||
|
||||
|
||||
|
||||
def write_metadata_file(metadata, filename, version_number,
|
||||
compression_algorithms, consistent_snapshot):
|
||||
def write_metadata_file(metadata, filename, version_number, consistent_snapshot):
|
||||
"""
|
||||
<Purpose>
|
||||
If necessary, write the 'metadata' signable object to 'filename', and the
|
||||
compressed version of the metadata file if 'compression' is set.
|
||||
|
||||
Note: Compression algorithms like gzip attach a timestamp to compressed
|
||||
files, so a metadata file compressed multiple times may generate different
|
||||
digests even though the uncompressed content has not changed.
|
||||
If necessary, write the 'metadata' signable object to 'filename'.
|
||||
|
||||
<Arguments>
|
||||
metadata:
|
||||
|
|
@ -1772,27 +1732,22 @@ def write_metadata_file(metadata, filename, version_number,
|
|||
|
||||
filename:
|
||||
The filename of the metadata to be written (e.g., 'root.json').
|
||||
If a compression algorithm is specified in 'compression_algorithms', the
|
||||
compression extention is appended to 'filename'.
|
||||
|
||||
version_number:
|
||||
The version number of the metadata file to be written. The version
|
||||
number is needed for consistent snapshots, which prepend the version
|
||||
number to 'filename'.
|
||||
|
||||
compression_algorithms:
|
||||
Specify the algorithms, as a list of strings, used to compress the
|
||||
'metadata'; The only currently available compression option is 'gz'
|
||||
(gzip).
|
||||
|
||||
consistent_snapshot:
|
||||
Boolean that determines whether the metadata file's digest should be
|
||||
prepended to the filename.
|
||||
|
||||
<Exceptions>
|
||||
securesystemslib.exceptions.FormatError, if the arguments are improperly formatted.
|
||||
securesystemslib.exceptions.FormatError, if the arguments are improperly
|
||||
formatted.
|
||||
|
||||
securesystemslib.exceptions.Error, if the directory of 'filename' does not exist.
|
||||
securesystemslib.exceptions.Error, if the directory of 'filename' does not
|
||||
exist.
|
||||
|
||||
Any other runtime (e.g., IO) exception.
|
||||
|
||||
|
|
@ -1811,7 +1766,6 @@ def write_metadata_file(metadata, filename, version_number,
|
|||
tuf.formats.SIGNABLE_SCHEMA.check_match(metadata)
|
||||
securesystemslib.formats.PATH_SCHEMA.check_match(filename)
|
||||
tuf.formats.METADATAVERSION_SCHEMA.check_match(version_number)
|
||||
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
|
||||
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
|
||||
|
||||
# Verify the directory of 'filename', and convert 'filename' to its absolute
|
||||
|
|
@ -1878,115 +1832,21 @@ def write_metadata_file(metadata, filename, version_number,
|
|||
os.link(written_consistent_filename, written_filename)
|
||||
|
||||
else:
|
||||
raise securesystemslib.exceptions.InvalidConfigurationError('The consistent method specified'
|
||||
' in tuf.settings.py is not supported, try either "copy" or "hard_link"')
|
||||
raise securesystemslib.exceptions.InvalidConfigurationError('The'
|
||||
' consistent method specified in tuf.settings.py is not supported, try'
|
||||
' either "copy" or "hard_link"')
|
||||
|
||||
else:
|
||||
logger.debug('Not creating a consistent snapshot for ' + repr(written_filename))
|
||||
logger.debug('Saving ' + repr(written_filename))
|
||||
file_object.move(written_filename)
|
||||
|
||||
# Generate the compressed versions of 'metadata', if necessary. A compressed
|
||||
# file may be written (without needing to write the uncompressed version) if
|
||||
# the repository maintainer adds compression after writing the uncompressed
|
||||
# version.
|
||||
for compression_algorithm in compression_algorithms:
|
||||
file_object = None
|
||||
|
||||
# Ignore the empty string that signifies non-compression. The uncompressed
|
||||
# file was previously written above, if necessary.
|
||||
if not len(compression_algorithm):
|
||||
continue
|
||||
|
||||
elif compression_algorithm == 'gz':
|
||||
file_object = securesystemslib.util.TempFile()
|
||||
compressed_filename = filename + '.gz'
|
||||
|
||||
# Instantiate a gzip object, but save compressed content to
|
||||
# 'file_object' (i.e., GzipFile instance is based on its 'fileobj'
|
||||
# argument).
|
||||
gzip_object = gzip.GzipFile(fileobj=file_object, mode='wb')
|
||||
try:
|
||||
gzip_object.write(file_content)
|
||||
|
||||
finally:
|
||||
gzip_object.close()
|
||||
|
||||
# This else clause should not be reached because the
|
||||
# 'compression_algorithms' list is validated against the
|
||||
# COMPRESSIONS_SCHEMA above.
|
||||
else: # pragma: no cover
|
||||
raise securesystemslib.exceptions.FormatError('Unknown compression algorithm:'
|
||||
' ' + repr(compression_algorithm))
|
||||
|
||||
# Save the compressed version, ensuring an unchanged file is not re-saved.
|
||||
# Re-saving the same compressed version may cause its digest to
|
||||
# unexpectedly change (gzip includes a timestamp) even though content has
|
||||
# not changed.
|
||||
_write_compressed_metadata(file_object, compressed_filename,
|
||||
True, consistent_snapshot,
|
||||
version_number)
|
||||
return written_filename
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _write_compressed_metadata(file_object, compressed_filename,
|
||||
write_new_metadata, consistent_snapshot, version_number):
|
||||
"""
|
||||
Write compressed versions of metadata, ensuring compressed file that have
|
||||
not changed are not re-written, the digest of the compressed file is properly
|
||||
added to the compressed filename, and consistent snapshots are also saved.
|
||||
Ensure compressed files are written to a temporary location, and then
|
||||
moved to their destinations.
|
||||
"""
|
||||
|
||||
# If a consistent snapshot is unneeded, 'file_object' may be simply moved
|
||||
# 'compressed_filename' if not already written.
|
||||
if not consistent_snapshot:
|
||||
if write_new_metadata or not os.path.exists(compressed_filename):
|
||||
file_object.move(compressed_filename)
|
||||
|
||||
# The temporary file must be closed if 'file_object.move()' is not used.
|
||||
# securesystemslib.util.TempFile() automatically closes the temp file when move() is
|
||||
# called
|
||||
else:
|
||||
file_object.close_temp_file()
|
||||
|
||||
# consistent snapshots = True. Ensure the version number is included in the
|
||||
# compressed filename written, provided it does not already exist.
|
||||
else:
|
||||
compressed_content = file_object.read()
|
||||
consistent_filename = None
|
||||
version_and_filename = None
|
||||
|
||||
# Attach the version number to the compressed, consistent snapshot filename.
|
||||
dirname, basename = os.path.split(compressed_filename)
|
||||
|
||||
for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS:
|
||||
if basename.endswith(compression_extension):
|
||||
basename = basename.split(compression_extension, 1)[0]
|
||||
version_and_filename = str(version_number) + '.' + basename + compression_extension
|
||||
consistent_filename = os.path.join(dirname, version_and_filename)
|
||||
|
||||
else:
|
||||
logger.debug('Skipping compression extension: ' + repr(compression_extension))
|
||||
|
||||
# Move the 'securesystemslib.util.TempFile' object to one of the filenames so that it is
|
||||
# saved and the temporary file closed.
|
||||
if not os.path.exists(consistent_filename):
|
||||
logger.debug('Saving ' + repr(consistent_filename))
|
||||
file_object.move(consistent_filename)
|
||||
|
||||
else:
|
||||
logger.debug('Skipping already written compressed file:'
|
||||
' ' + repr(consistent_filename))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _log_status_of_top_level_roles(targets_directory, metadata_directory,
|
||||
repository_name):
|
||||
"""
|
||||
|
|
|
|||
Loading…
Reference in a new issue