mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Resolve merge conflicts
This commit is contained in:
commit
72fdbc53ea
73 changed files with 208 additions and 647 deletions
|
|
@ -420,13 +420,6 @@ Version: **1.0 (Draft)**
|
|||
Signed by the mirrors role's keys. Lists information about available
|
||||
mirrors and the content available from each mirror.
|
||||
|
||||
An implementation of the framework may optionally choose to make available
|
||||
any metadata files in compressed (e.g. gzip'd) format. In doing so, the
|
||||
filename of the compressed file should be the same as the original with the
|
||||
addition of the file name extension for the compression type (e.g.
|
||||
snapshot.json.gz). The original (uncompressed) file should always be made
|
||||
available, as well.
|
||||
|
||||
+ **3.1.2.1 Metadata files for targets delegation**
|
||||
|
||||
When the targets role delegates trust to other roles, each delegated role
|
||||
|
|
@ -541,7 +534,7 @@ Version: **1.0 (Draft)**
|
|||
The "signed" portion of root.json is as follows:
|
||||
|
||||
{ "_type" : "root",
|
||||
"compression_algorithms": [ COMPRESSION_ALGORITHM, ... ],
|
||||
"spec_version" : SPEC_VERSION,
|
||||
"consistent_snapshot": CONSISTENT_SNAPSHOT,
|
||||
"version" : VERSION,
|
||||
"expires" : EXPIRES,
|
||||
|
|
@ -555,10 +548,12 @@ Version: **1.0 (Draft)**
|
|||
, ... }
|
||||
}
|
||||
|
||||
COMPRESSION_ALGORITHM specifies one of the compression algorithms supported
|
||||
by the repository. Metadata files available on the repository may
|
||||
optionally be compressed with this algorithm. Compressed versions of
|
||||
metadata are not listed in snapshot.json.
|
||||
SPEC_VERSION is the version number of the specification. Metadata is
|
||||
written according to version "spec_version" of the specification, and
|
||||
clients MUST verify that "spec_version" matches the expected version number.
|
||||
Adopters are free to determine what is considered a match (e.g., the version
|
||||
number must exactly exactly, or perhaps only the major version number
|
||||
(major.minor.fix).
|
||||
|
||||
CONSISTENT_SNAPSHOT is a boolean indicating whether the repository supports
|
||||
consistent snapshots. Section 7 goes into more detail on the consequences
|
||||
|
|
@ -598,6 +593,7 @@ Version: **1.0 (Draft)**
|
|||
],
|
||||
"signed": {
|
||||
"_type": "root",
|
||||
"spec_version": "1",
|
||||
"consistent_snapshot": false,
|
||||
"expires": "2030-01-01T00:00:00Z",
|
||||
"keys": {
|
||||
|
|
@ -670,6 +666,7 @@ Version: **1.0 (Draft)**
|
|||
The "signed" portion of snapshot.json is as follows:
|
||||
|
||||
{ "_type" : "snapshot",
|
||||
"spec_version" : SPEC_VERSION,
|
||||
"version" : VERSION,
|
||||
"expires" : EXPIRES,
|
||||
"meta" : METAFILES
|
||||
|
|
@ -700,6 +697,7 @@ Version: **1.0 (Draft)**
|
|||
],
|
||||
"signed": {
|
||||
"_type": "snapshot",
|
||||
"spec_version": "1",
|
||||
"expires": "2030-01-01T00:00:00Z",
|
||||
"meta": {
|
||||
"root.json": {
|
||||
|
|
@ -721,6 +719,7 @@ Version: **1.0 (Draft)**
|
|||
The "signed" portion of targets.json is as follows:
|
||||
|
||||
{ "_type" : "targets",
|
||||
"spec_version" : SPEC_VERSION,
|
||||
"version" : VERSION,
|
||||
"expires" : EXPIRES,
|
||||
"targets" : TARGETS,
|
||||
|
|
@ -829,6 +828,7 @@ Version: **1.0 (Draft)**
|
|||
],
|
||||
"signed": {
|
||||
"_type": "targets",
|
||||
"spec_version": "1",
|
||||
"delegations": {
|
||||
"keys": {
|
||||
"ce3e02e72980b09ca6f5efa68197130b381921e5d0675e2e0c8f3c47e0626bba": {
|
||||
|
|
@ -884,6 +884,7 @@ Version: **1.0 (Draft)**
|
|||
The "signed" portion of timestamp.json is as follows:
|
||||
|
||||
{ "_type" : "timestamp",
|
||||
"spec_version" : SPEC_VERSION,
|
||||
"version" : VERSION,
|
||||
"expires" : EXPIRES,
|
||||
"meta" : METAFILES
|
||||
|
|
@ -905,6 +906,7 @@ Version: **1.0 (Draft)**
|
|||
],
|
||||
"signed": {
|
||||
"_type": "timestamp",
|
||||
"spec_version": "1",
|
||||
"expires": "2030-01-01T00:00:00Z",
|
||||
"meta": {
|
||||
"snapshot.json": {
|
||||
|
|
@ -929,6 +931,7 @@ Version: **1.0 (Draft)**
|
|||
|
||||
|
||||
{ "_type" : "mirrors",
|
||||
"spec_version" : SPEC_VERSION,
|
||||
"version" : VERSION,
|
||||
"expires" : EXPIRES,
|
||||
"mirrors" : [
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -135,13 +135,6 @@
|
|||
repository.targets('role1').expiration = datetime.datetime(2030, 1, 1, 0, 0)
|
||||
repository.targets('role2').expiration = datetime.datetime(2030, 1, 1, 0, 0)
|
||||
|
||||
# Compress the top-level role metadata so that the unit tests have a
|
||||
# pre-generated example of compressed metadata.
|
||||
repository.root.compressions = ['gz']
|
||||
repository.targets.compressions = ['gz']
|
||||
repository.snapshot.compressions = ['gz']
|
||||
repository.timestamp.compressions = ['gz']
|
||||
|
||||
# Create the actual metadata files, which are saved to 'metadata.staged'.
|
||||
if not options.dry_run:
|
||||
repository.writeall()
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -347,10 +347,6 @@ def test_write(self):
|
|||
# + backup the name.
|
||||
name_backup = project._project_name
|
||||
|
||||
# Set the compressions. We will be checking this part here too.
|
||||
project.compressions = ['gz']
|
||||
project('delegation').compressions = project.compressions
|
||||
|
||||
# Write and reload.
|
||||
self.assertRaises(securesystemslib.exceptions.Error, project.write)
|
||||
project.write(write_partial=True)
|
||||
|
|
|
|||
|
|
@ -51,12 +51,6 @@ def test_bad_hash_error(self):
|
|||
logger.error(bad_hash_error)
|
||||
|
||||
|
||||
def test_decompression_error(self):
|
||||
download_exception = tuf.exceptions.DownloadError()
|
||||
decompression_error = tuf.exceptions.DecompressionError(download_exception)
|
||||
logger.error(decompression_error)
|
||||
|
||||
|
||||
# Run the unit tests.
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
|
|
@ -197,9 +197,9 @@ def test_schemas(self):
|
|||
|
||||
'ROOT_SCHEMA': (tuf.formats.ROOT_SCHEMA,
|
||||
{'_type': 'root',
|
||||
'spec_version': '1.0',
|
||||
'version': 8,
|
||||
'consistent_snapshot': False,
|
||||
'compression_algorithms': ['gz'],
|
||||
'expires': '1985-10-21T13:20:00Z',
|
||||
'keys': {'123abc': {'keytype': 'rsa',
|
||||
'scheme': 'rsassa-pss-sha256',
|
||||
|
|
@ -211,6 +211,7 @@ def test_schemas(self):
|
|||
|
||||
'TARGETS_SCHEMA': (tuf.formats.TARGETS_SCHEMA,
|
||||
{'_type': 'targets',
|
||||
'spec_version': '1.0',
|
||||
'version': 8,
|
||||
'expires': '1985-10-21T13:20:00Z',
|
||||
'targets': {'metadata/targets.json': {'length': 1024,
|
||||
|
|
@ -226,12 +227,14 @@ def test_schemas(self):
|
|||
|
||||
'SNAPSHOT_SCHEMA': (tuf.formats.SNAPSHOT_SCHEMA,
|
||||
{'_type': 'snapshot',
|
||||
'spec_version': '1.0',
|
||||
'version': 8,
|
||||
'expires': '1985-10-21T13:20:00Z',
|
||||
'meta': {'snapshot.json': {'version': 1024}}}),
|
||||
|
||||
'TIMESTAMP_SCHEMA': (tuf.formats.TIMESTAMP_SCHEMA,
|
||||
{'_type': 'timestamp',
|
||||
'spec_version': '1.0',
|
||||
'version': 8,
|
||||
'expires': '1985-10-21T13:20:00Z',
|
||||
'meta': {'metadattimestamp.json': {'length': 1024,
|
||||
|
|
@ -254,6 +257,7 @@ def test_schemas(self):
|
|||
'MIRRORLIST_SCHEMA': (tuf.formats.MIRRORLIST_SCHEMA,
|
||||
{'_type': 'mirrors',
|
||||
'version': 8,
|
||||
'spec_version': '1.0',
|
||||
'expires': '1985-10-21T13:20:00Z',
|
||||
'mirrors': [{'url_prefix': 'http://localhost:8001',
|
||||
'metadata_path': 'metadata/',
|
||||
|
|
@ -336,6 +340,7 @@ def test_TimestampFile(self):
|
|||
|
||||
|
||||
|
||||
|
||||
def test_RootFile(self):
|
||||
# Test conditions for valid instances of 'tuf.formats.RootFile'.
|
||||
version = 8
|
||||
|
|
@ -351,18 +356,14 @@ def test_RootFile(self):
|
|||
'threshold': 1,
|
||||
'paths': ['path1/', 'path2']}}
|
||||
|
||||
compression_algorithms = ['gz']
|
||||
|
||||
make_metadata = tuf.formats.RootFile.make_metadata
|
||||
from_metadata = tuf.formats.RootFile.from_metadata
|
||||
ROOT_SCHEMA = tuf.formats.ROOT_SCHEMA
|
||||
|
||||
self.assertTrue(ROOT_SCHEMA.matches(make_metadata(version, expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)))
|
||||
keydict, roledict, consistent_snapshot)))
|
||||
metadata = make_metadata(version, expires, keydict, roledict,
|
||||
consistent_snapshot, compression_algorithms)
|
||||
consistent_snapshot)
|
||||
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.RootFile))
|
||||
|
||||
# Test conditions for invalid arguments.
|
||||
|
|
@ -370,28 +371,15 @@ def test_RootFile(self):
|
|||
bad_expires = 'eight'
|
||||
bad_keydict = 123
|
||||
bad_roledict = 123
|
||||
bad_compression_algorithms = ['nozip']
|
||||
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata, bad_version,
|
||||
expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata, version,
|
||||
bad_expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata, version,
|
||||
expires,
|
||||
bad_keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata, version,
|
||||
expires,
|
||||
keydict, bad_roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata,
|
||||
bad_version, expires, keydict, roledict, consistent_snapshot)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata,
|
||||
version, bad_expires, keydict, roledict, consistent_snapshot)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata,
|
||||
version, expires, bad_keydict, roledict, consistent_snapshot)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata,
|
||||
version, expires, keydict, bad_roledict, consistent_snapshot)
|
||||
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, from_metadata, 'bad')
|
||||
|
||||
|
|
@ -557,9 +545,9 @@ def test_parse_base64(self):
|
|||
def test_make_signable(self):
|
||||
# Test conditions for expected make_signable() behavior.
|
||||
root = {'_type': 'root',
|
||||
'spec_version': '1.0',
|
||||
'version': 8,
|
||||
'consistent_snapshot': False,
|
||||
'compression_algorithms': ['gz'],
|
||||
'expires': '1985-10-21T13:20:00Z',
|
||||
'keys': {'123abc': {'keytype': 'rsa',
|
||||
'scheme': 'rsassa-pss-sha256',
|
||||
|
|
@ -582,6 +570,8 @@ def test_make_signable(self):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
def test_make_fileinfo(self):
|
||||
# Test conditions for valid arguments.
|
||||
length = 1024
|
||||
|
|
@ -707,9 +697,9 @@ def test_expected_meta_rolename(self):
|
|||
def test_check_signable_object_format(self):
|
||||
# Test condition for a valid argument.
|
||||
root = {'_type': 'root',
|
||||
'spec_version': '1.0',
|
||||
'version': 8,
|
||||
'consistent_snapshot': False,
|
||||
'compression_algorithms': ['gz'],
|
||||
'expires': '1985-10-21T13:20:00Z',
|
||||
'keys': {'123abc': {'keytype': 'rsa',
|
||||
'scheme': 'rsassa-pss-sha256',
|
||||
|
|
|
|||
|
|
@ -311,13 +311,9 @@ def test_create_keydb_from_root_metadata(self):
|
|||
version = 8
|
||||
consistent_snapshot = False
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
compression_algorithms = ['gz']
|
||||
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version,
|
||||
expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version, expires,
|
||||
keydict, roledict, consistent_snapshot)
|
||||
self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata))
|
||||
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
|
||||
|
||||
|
|
@ -331,17 +327,17 @@ def test_create_keydb_from_root_metadata(self):
|
|||
|
||||
# Test conditions for arguments with invalid formats.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
tuf.keydb.create_keydb_from_root_metadata, None)
|
||||
tuf.keydb.create_keydb_from_root_metadata, None)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
tuf.keydb.create_keydb_from_root_metadata, '')
|
||||
tuf.keydb.create_keydb_from_root_metadata, '')
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
tuf.keydb.create_keydb_from_root_metadata, 123)
|
||||
tuf.keydb.create_keydb_from_root_metadata, 123)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
tuf.keydb.create_keydb_from_root_metadata, ['123'])
|
||||
tuf.keydb.create_keydb_from_root_metadata, ['123'])
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
tuf.keydb.create_keydb_from_root_metadata, {'bad': '123'})
|
||||
tuf.keydb.create_keydb_from_root_metadata, {'bad': '123'})
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
tuf.keydb.create_keydb_from_root_metadata, root_metadata, 123)
|
||||
tuf.keydb.create_keydb_from_root_metadata, root_metadata, 123)
|
||||
|
||||
# Verify that a keydb cannot be created for a non-existent repository name.
|
||||
tuf.keydb.create_keydb_from_root_metadata(root_metadata, 'non-existent')
|
||||
|
|
@ -367,13 +363,9 @@ def test_create_keydb_from_root_metadata(self):
|
|||
keydict[keyid3] = rsakey3
|
||||
version = 8
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
compression_algorithms = ['gz']
|
||||
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version,
|
||||
expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version, expires,
|
||||
keydict, roledict, consistent_snapshot)
|
||||
self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata))
|
||||
|
||||
# Ensure only 'keyid2' was added to the keydb database. 'keyid' and
|
||||
|
|
|
|||
|
|
@ -749,19 +749,17 @@ def test_write_metadata_file(self):
|
|||
root_signable = securesystemslib.util.load_json_file(root_filename)
|
||||
|
||||
output_filename = os.path.join(temporary_directory, 'root.json')
|
||||
compression_algorithms = ['gz']
|
||||
version_number = root_signable['signed']['version'] + 1
|
||||
|
||||
self.assertFalse(os.path.exists(output_filename))
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=False)
|
||||
consistent_snapshot=False)
|
||||
self.assertTrue(os.path.exists(output_filename))
|
||||
self.assertTrue(os.path.exists(output_filename + '.gz'))
|
||||
|
||||
# Attempt to over-write the previously written metadata file. An exception
|
||||
# is not raised in this case, only a debug message is logged.
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=False)
|
||||
consistent_snapshot=False)
|
||||
|
||||
# Try to write a consistent metadate file. An exception is not raised in
|
||||
# this case. For testing purposes, root.json should be a hard link to the
|
||||
|
|
@ -769,7 +767,7 @@ def test_write_metadata_file(self):
|
|||
# the latest consistent files.
|
||||
tuf.settings.CONSISTENT_METHOD = 'hard_link'
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=True)
|
||||
consistent_snapshot=True)
|
||||
|
||||
# Test if the consistent files are properly named
|
||||
# Filename format of a consistent file: <version number>.rolename.json
|
||||
|
|
@ -780,9 +778,7 @@ def test_write_metadata_file(self):
|
|||
# Try to add more consistent metadata files.
|
||||
version_number += 1
|
||||
repo_lib.write_metadata_file(root_signable, output_filename,
|
||||
version_number,
|
||||
compression_algorithms,
|
||||
consistent_snapshot=True)
|
||||
version_number, consistent_snapshot=True)
|
||||
|
||||
# Test if the the latest root.json points to the expected consistent file
|
||||
# and consistent metadata do not all point to the same root.json
|
||||
|
|
@ -796,76 +792,34 @@ def test_write_metadata_file(self):
|
|||
tuf.settings.CONSISTENT_METHOD = 'somebadidea'
|
||||
self.assertRaises(securesystemslib.exceptions.InvalidConfigurationError,
|
||||
repo_lib.write_metadata_file, root_signable, output_filename,
|
||||
version_number, compression_algorithms, consistent_snapshot=True)
|
||||
version_number, consistent_snapshot=True)
|
||||
|
||||
# Try to create a link to root.json when root.json doesn't exist locally.
|
||||
# repository_lib should log a message if this is the case.
|
||||
tuf.settings.CONSISTENT_METHOD = 'hard_link'
|
||||
os.remove(output_filename)
|
||||
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
||||
compression_algorithms, consistent_snapshot=True)
|
||||
consistent_snapshot=True)
|
||||
|
||||
# Reset CONSISTENT_METHOD so that subsequent tests work as expected.
|
||||
tuf.settings.CONSISTENT_METHOD = 'copy'
|
||||
|
||||
# Test for unknown compression algorithm.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, output_filename, version_number, compression_algorithms=['bad_algo'],
|
||||
consistent_snapshot=False)
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
3, output_filename, version_number,
|
||||
compression_algorithms, False)
|
||||
3, output_filename, version_number, False)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, 3, version_number, compression_algorithms,
|
||||
False)
|
||||
root_signable, 3, version_number, False)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, output_filename, '3',
|
||||
compression_algorithms, False)
|
||||
root_signable, output_filename, '3', False)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
||||
root_signable, output_filename, version_number,
|
||||
compression_algorithms, 3)
|
||||
root_signable, output_filename, version_number, 3)
|
||||
|
||||
|
||||
|
||||
def test__write_compressed_metadata(self):
|
||||
# Test for invalid 'compressed_filename' argument and set
|
||||
# 'write_new_metadata' to False.
|
||||
file_object = securesystemslib.util.TempFile()
|
||||
existing_filename = os.path.join('repository_data', 'repository',
|
||||
'metadata', 'root.json')
|
||||
|
||||
write_new_metadata = False
|
||||
repo_lib._write_compressed_metadata(file_object,
|
||||
compressed_filename=existing_filename,
|
||||
write_new_metadata=write_new_metadata,
|
||||
consistent_snapshot=False,
|
||||
version_number=8)
|
||||
|
||||
# Test writing of compressed metadata when consistent snapshots is enabled.
|
||||
file_object = securesystemslib.util.TempFile()
|
||||
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz'))
|
||||
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip'))
|
||||
shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip'))
|
||||
compressed_filename = os.path.join(self.temporary_directory, 'root.json.gz')
|
||||
|
||||
# For testing purposes, add additional compression algorithms to
|
||||
# repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS.
|
||||
repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz', 'zip', 'bz2']
|
||||
repo_lib._write_compressed_metadata(file_object,
|
||||
compressed_filename=compressed_filename,
|
||||
write_new_metadata=True,
|
||||
consistent_snapshot=True,
|
||||
version_number=8)
|
||||
repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz']
|
||||
|
||||
|
||||
def test_create_tuf_client_directory(self):
|
||||
# Test normal case.
|
||||
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
||||
repository_directory = os.path.join('repository_data',
|
||||
'repository')
|
||||
repository_directory = os.path.join('repository_data', 'repository')
|
||||
client_directory = os.path.join(temporary_directory, 'client')
|
||||
|
||||
repo_lib.create_tuf_client_directory(repository_directory, client_directory)
|
||||
|
|
@ -881,15 +835,16 @@ def test_create_tuf_client_directory(self):
|
|||
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.create_tuf_client_directory,
|
||||
3, client_directory)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.create_tuf_client_directory,
|
||||
repository_directory, 3)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
repo_lib.create_tuf_client_directory, 3, client_directory)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError,
|
||||
repo_lib.create_tuf_client_directory, repository_directory, 3)
|
||||
|
||||
|
||||
# Test invalid argument (i.e., client directory already exists.)
|
||||
self.assertRaises(securesystemslib.exceptions.RepositoryError, repo_lib.create_tuf_client_directory,
|
||||
repository_directory, client_directory)
|
||||
self.assertRaises(securesystemslib.exceptions.RepositoryError,
|
||||
repo_lib.create_tuf_client_directory, repository_directory,
|
||||
client_directory)
|
||||
|
||||
# Test invalid client metadata directory (i.e., non-errno.EEXIST exceptions
|
||||
# should be re-raised.)
|
||||
|
|
@ -903,7 +858,7 @@ def test_create_tuf_client_directory(self):
|
|||
os.chmod(client_directory, current_client_directory_mode & ~stat.S_IWUSR)
|
||||
|
||||
self.assertRaises(OSError, repo_lib.create_tuf_client_directory,
|
||||
repository_directory, client_directory)
|
||||
repository_directory, client_directory)
|
||||
|
||||
# Reset the client directory's mode.
|
||||
os.chmod(client_directory, current_client_directory_mode)
|
||||
|
|
@ -912,7 +867,8 @@ def test_create_tuf_client_directory(self):
|
|||
|
||||
def test__check_directory(self):
|
||||
# Test for non-existent directory.
|
||||
self.assertRaises(securesystemslib.exceptions.Error, repo_lib._check_directory, 'non-existent')
|
||||
self.assertRaises(securesystemslib.exceptions.Error,
|
||||
repo_lib._check_directory, 'non-existent')
|
||||
|
||||
|
||||
|
||||
|
|
@ -954,8 +910,7 @@ def test__generate_and_write_metadata(self):
|
|||
|
||||
repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata,
|
||||
targets_directory, metadata_directory, consistent_snapshot=False,
|
||||
filenames=None, compression_algorithms=['gz'],
|
||||
repository_name=repository_name)
|
||||
filenames=None, repository_name=repository_name)
|
||||
|
||||
snapshot_filepath = os.path.join('repository_data', 'repository',
|
||||
'metadata', 'snapshot.json')
|
||||
|
|
@ -1022,20 +977,13 @@ def test__load_top_level_metadata(self):
|
|||
signable = securesystemslib.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
|
||||
signable['signatures'].append(signable['signatures'][0])
|
||||
|
||||
repo_lib.write_metadata_file(signable, root_file, 8, ['gz'], False)
|
||||
repo_lib.write_metadata_file(signable, root_file, 8, False)
|
||||
|
||||
# Attempt to load a repository that contains a compressed Root file.
|
||||
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
||||
filenames = repo_lib.get_metadata_filenames(metadata_directory)
|
||||
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
||||
|
||||
# Remove compressed metadata so that we can test for loading of a
|
||||
# repository with no compression enabled.
|
||||
for role_file in os.listdir(metadata_directory):
|
||||
if role_file.endswith('.json.gz'):
|
||||
role_filename = os.path.join(metadata_directory, role_file)
|
||||
os.remove(role_filename)
|
||||
|
||||
filenames = repo_lib.get_metadata_filenames(metadata_directory)
|
||||
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
||||
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
||||
|
|
|
|||
|
|
@ -227,7 +227,6 @@ def test_writeall(self):
|
|||
repository.targets('role1').load_signing_key(role1_privkey)
|
||||
|
||||
# (6) Write repository.
|
||||
repository.targets.compressions = ['gz']
|
||||
repository.writeall()
|
||||
|
||||
# Verify that the expected metadata is written.
|
||||
|
|
@ -240,10 +239,6 @@ def test_writeall(self):
|
|||
|
||||
self.assertTrue(os.path.exists(role_filepath))
|
||||
|
||||
if role == 'targets.json':
|
||||
compressed_filepath = role_filepath + '.gz'
|
||||
self.assertTrue(os.path.exists(compressed_filepath))
|
||||
|
||||
# Verify the 'role1.json' delegation is also written.
|
||||
role1_filepath = os.path.join(metadata_directory, 'role1.json')
|
||||
role1_signable = securesystemslib.util.load_json_file(role1_filepath)
|
||||
|
|
@ -373,8 +368,7 @@ def test_writeall(self):
|
|||
self.assertEqual(['timestamp'], tuf.roledb.get_dirty_roles(repository_name))
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repository.writeall, 3, False)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repository.writeall, False, 3)
|
||||
self.assertRaises(securesystemslib.exceptions.FormatError, repository.writeall, 3)
|
||||
|
||||
|
||||
|
||||
|
|
@ -388,11 +382,9 @@ def test_get_filepaths_in_directory(self):
|
|||
# Verify the expected filenames. get_filepaths_in_directory() returns
|
||||
# a list of absolute paths.
|
||||
metadata_files = repo.get_filepaths_in_directory(metadata_directory)
|
||||
expected_files = ['1.root.json', '1.root.json.gz', 'root.json',
|
||||
'targets.json', 'targets.json.gz', 'snapshot.json',
|
||||
'snapshot.json.gz', 'timestamp.json',
|
||||
'timestamp.json.gz', 'role1.json', 'role1.json.gz',
|
||||
'role2.json', 'role2.json.gz']
|
||||
expected_files = ['1.root.json', 'root.json',
|
||||
'targets.json', 'snapshot.json',
|
||||
'timestamp.json', 'role1.json', 'role2.json']
|
||||
|
||||
basenames = []
|
||||
for filepath in metadata_files:
|
||||
|
|
@ -448,7 +440,7 @@ def __init__(self):
|
|||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0,
|
||||
'consistent_snapshot': False,
|
||||
'compressions': [''], 'expires': expiration,
|
||||
'expires': expiration,
|
||||
'partial_loaded': False}
|
||||
|
||||
tuf.roledb.add_role(self._rolename, roleinfo,
|
||||
|
|
@ -573,24 +565,6 @@ def test_signing_keys(self):
|
|||
|
||||
|
||||
|
||||
def test_compressions(self):
|
||||
# Test default case, where only uncompressed metadata is supported.
|
||||
self.assertEqual(self.metadata.compressions, [''])
|
||||
|
||||
# Test compressions getter after a compressions algorithm is added.
|
||||
self.metadata.compressions = ['gz']
|
||||
|
||||
self.assertEqual(self.metadata.compressions, ['', 'gz'])
|
||||
|
||||
|
||||
# Test improperly formatted argument.
|
||||
try:
|
||||
self.metadata.compressions = 3
|
||||
except securesystemslib.exceptions.FormatError:
|
||||
pass
|
||||
else:
|
||||
self.fail('Setter failed to detect improperly formatted compressions')
|
||||
|
||||
|
||||
|
||||
def test_add_verification_key(self):
|
||||
|
|
@ -608,8 +582,7 @@ def test_add_verification_key(self):
|
|||
expiration = expiration.isoformat() + 'Z'
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0,
|
||||
'consistent_snapshot': False,
|
||||
'compressions': [''], 'expires': expiration,
|
||||
'consistent_snapshot': False, 'expires': expiration,
|
||||
'partial_loaded': False}
|
||||
|
||||
tuf.roledb.add_role('Root', roleinfo, 'test_repository')
|
||||
|
|
@ -1708,11 +1681,6 @@ def test_load_repository(self):
|
|||
with open(bad_root_content, 'wb') as file_object:
|
||||
file_object.write(b'bad')
|
||||
|
||||
# Remove the compressed version of role1 to test whether the
|
||||
# load_repository() complains or not (it logs a message).
|
||||
role1_path = os.path.join(metadata_directory, 'role1.json.gz')
|
||||
os.remove(role1_path)
|
||||
|
||||
repository = repo_tool.load_repository(repository_directory)
|
||||
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
||||
|
||||
|
|
|
|||
|
|
@ -542,13 +542,9 @@ def test_create_roledb_from_root_metadata(self):
|
|||
version = 8
|
||||
consistent_snapshot = False
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
compression_algorithms = ['gz']
|
||||
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version,
|
||||
expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
expires, keydict, roledict, consistent_snapshot)
|
||||
self.assertEqual(None,
|
||||
tuf.roledb.create_roledb_from_root_metadata(root_metadata))
|
||||
|
||||
|
|
@ -594,12 +590,9 @@ def test_create_roledb_from_root_metadata(self):
|
|||
# Generate 'root_metadata' to verify that 'release' and 'root' are added
|
||||
# to the role database.
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version,
|
||||
expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
expires, keydict, roledict, consistent_snapshot)
|
||||
self.assertEqual(None,
|
||||
tuf.roledb.create_roledb_from_root_metadata(root_metadata))
|
||||
tuf.roledb.create_roledb_from_root_metadata(root_metadata))
|
||||
|
||||
# Ensure only 'root' and 'release' were added to the role database.
|
||||
self.assertEqual(2, len(tuf.roledb._roledb_dict['default']))
|
||||
|
|
|
|||
|
|
@ -179,7 +179,6 @@ def test_root_role_versioning(self):
|
|||
repository.targets('role1').load_signing_key(role1_privkey)
|
||||
|
||||
# (6) Write repository.
|
||||
repository.targets.compressions = ['gz']
|
||||
repository.writeall()
|
||||
|
||||
self.assertTrue(os.path.exists(os.path.join(metadata_directory, 'root.json')))
|
||||
|
|
|
|||
|
|
@ -709,18 +709,16 @@ def test_3__update_metadata(self):
|
|||
# version is installed if the compressed one is downloaded.
|
||||
self.assertFalse('targets' in self.repository_updater.metadata['current'])
|
||||
self.repository_updater._update_metadata('targets',
|
||||
DEFAULT_TARGETS_FILELENGTH,
|
||||
targets_versioninfo['version'],
|
||||
'gzip')
|
||||
DEFAULT_TARGETS_FILELENGTH, targets_versioninfo['version'])
|
||||
self.assertTrue('targets' in self.repository_updater.metadata['current'])
|
||||
self.assertEqual(targets_versioninfo['version'],
|
||||
self.repository_updater.metadata['current']['targets']['version'])
|
||||
self.repository_updater.metadata['current']['targets']['version'])
|
||||
|
||||
# Test: Invalid / untrusted version numbers.
|
||||
# Invalid version number for the uncompressed version of 'targets.json'.
|
||||
# Invalid version number for 'targets.json'.
|
||||
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
|
||||
self.repository_updater._update_metadata,
|
||||
'targets', DEFAULT_TARGETS_FILELENGTH, 88)
|
||||
self.repository_updater._update_metadata,
|
||||
'targets', DEFAULT_TARGETS_FILELENGTH, 88)
|
||||
|
||||
# Verify that the specific exception raised is correct for the previous
|
||||
# case.
|
||||
|
|
@ -732,19 +730,13 @@ def test_3__update_metadata(self):
|
|||
for mirror_error in six.itervalues(e.mirror_errors):
|
||||
assert isinstance(mirror_error, securesystemslib.exceptions.BadVersionNumberError)
|
||||
|
||||
# Invalid version number for the compressed version of 'targets.json'
|
||||
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
|
||||
self.repository_updater._update_metadata,
|
||||
'targets', DEFAULT_TARGETS_FILELENGTH, 88,
|
||||
'gzip')
|
||||
|
||||
# Verify that the specific exception raised is correct for the previous
|
||||
# case. The version number is checked, so the specific error in
|
||||
# this case should be 'securesystemslib.exceptions.BadVersionNumberError'.
|
||||
try:
|
||||
self.repository_updater._update_metadata('targets',
|
||||
DEFAULT_TARGETS_FILELENGTH,
|
||||
88, 'gzip')
|
||||
88)
|
||||
|
||||
except tuf.exceptions.NoWorkingMirrorError as e:
|
||||
for mirror_error in six.itervalues(e.mirror_errors):
|
||||
|
|
@ -754,6 +746,37 @@ def test_3__update_metadata(self):
|
|||
|
||||
|
||||
|
||||
def test_3__get_metadata_file(self):
|
||||
|
||||
valid_tuf_version = tuf.formats.TUF_VERSION_NUMBER
|
||||
tuf.formats.TUF_VERSION_NUMBER = '2'
|
||||
|
||||
repository = repo_tool.load_repository(self.repository_directory)
|
||||
|
||||
repository.root.load_signing_key(self.role_keys['root']['private'])
|
||||
repository.targets.load_signing_key(self.role_keys['targets']['private'])
|
||||
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
|
||||
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
|
||||
repository.writeall()
|
||||
|
||||
# Move the staged metadata to the "live" metadata.
|
||||
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
|
||||
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
|
||||
os.path.join(self.repository_directory, 'metadata'))
|
||||
|
||||
upperbound_filelength = tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH
|
||||
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
|
||||
self.repository_updater._get_metadata_file, 'root', 'root.json',
|
||||
upperbound_filelength, 1)
|
||||
|
||||
# Reset the TUF_VERSION_NUMBER so that subsequent unit tests use the
|
||||
# expected value.
|
||||
tuf.formats.TUF_VERSION_NUMBER = valid_tuf_version
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def test_3__update_metadata_if_changed(self):
|
||||
# Setup.
|
||||
# The client repository is initially loaded with only four top-level roles.
|
||||
|
|
|
|||
|
|
@ -202,8 +202,8 @@ top-level roles, including itself.
|
|||
|
||||
# A role's verification key(s) (to be more precise, the verification key's
|
||||
# keyid) may be queried. Other attributes include: signing_keys, version,
|
||||
# signatures, expiration, threshold, delegations (attribute available only to a
|
||||
# Targets role), and compressions.
|
||||
# signatures, expiration, threshold, and delegations (attribute available only
|
||||
# to a Targets role).
|
||||
>>> repository.root.keys
|
||||
['b23514431a53676595922e955c2d547293da4a7917e3ca243a175e72bbf718df']
|
||||
|
||||
|
|
|
|||
|
|
@ -140,6 +140,10 @@
|
|||
iso8601_logger = logging.getLogger('iso8601')
|
||||
iso8601_logger.disabled = True
|
||||
|
||||
# Metadata includes the specification version number that it follows.
|
||||
# All downloaded metadata must be equal to our supported major version of 1.
|
||||
# For example, "1.4.3" and "1.0.0" are supported. "2.0.0" is not supported.
|
||||
SUPPORTED_MAJOR_VERSION = 1
|
||||
|
||||
class Updater(object):
|
||||
"""
|
||||
|
|
@ -687,7 +691,7 @@ def refresh(self, unsafely_update_root_if_necessary=True):
|
|||
|
||||
|
||||
|
||||
def _update_root_metadata(self, current_root_metadata, compression_algorithm=None):
|
||||
def _update_root_metadata(self, current_root_metadata):
|
||||
"""
|
||||
<Purpose>
|
||||
The root file must be signed by the current root threshold and keys as
|
||||
|
|
@ -704,9 +708,6 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
|
|||
current_root_metadata:
|
||||
The currently held version of root.
|
||||
|
||||
compresison_algorithm:
|
||||
The compression algorithm used to compress remote metadata.
|
||||
|
||||
<Side Effects>
|
||||
Updates the root metadata files with the latest information.
|
||||
|
||||
|
|
@ -717,8 +718,7 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
|
|||
# Retrieve the latest, remote root.json.
|
||||
latest_root_metadata_file = \
|
||||
self._get_metadata_file('root', 'root.json',
|
||||
tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH, None,
|
||||
compression_algorithm=compression_algorithm)
|
||||
tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH, None)
|
||||
latest_root_metadata = \
|
||||
securesystemslib.util.load_json_string(latest_root_metadata_file.read().decode('utf-8'))
|
||||
|
||||
|
|
@ -737,7 +737,7 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
|
|||
# _update_metadata().
|
||||
self.consistent_snapshot = True
|
||||
self._update_metadata('root', tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH,
|
||||
version=version, compression_algorithm=compression_algorithm)
|
||||
version=version)
|
||||
|
||||
|
||||
|
||||
|
|
@ -925,18 +925,13 @@ def verify_target_file(target_file_object):
|
|||
self._hard_check_file_length(target_file_object, file_length)
|
||||
self._check_hashes(target_file_object, file_hashes)
|
||||
|
||||
# Target files, unlike metadata files, are not decompressed; the
|
||||
# 'compression' argument to _get_file() is needed only for decompression of
|
||||
# metadata. Target files may be compressed or uncompressed.
|
||||
if self.consistent_snapshot:
|
||||
target_digest = random.choice(list(file_hashes.values()))
|
||||
dirname, basename = os.path.split(target_filepath)
|
||||
target_filepath = os.path.join(dirname, target_digest + '.' + basename)
|
||||
|
||||
return self._get_file(target_filepath, verify_target_file,
|
||||
'target', file_length, compression=None,
|
||||
verify_compressed_file_function=None,
|
||||
download_safely=True)
|
||||
'target', file_length, download_safely=True)
|
||||
|
||||
|
||||
|
||||
|
|
@ -1015,8 +1010,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
|
|||
|
||||
|
||||
def _get_metadata_file(self, metadata_role, remote_filename,
|
||||
upperbound_filelength, expected_version,
|
||||
compression_algorithm):
|
||||
upperbound_filelength, expected_version):
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that tries downloading, up to a certain length, a
|
||||
|
|
@ -1039,10 +1033,6 @@ def _get_metadata_file(self, metadata_role, remote_filename,
|
|||
The expected and required version number of the 'metadata_role' file
|
||||
downloaded. 'expected_version' is an integer.
|
||||
|
||||
compression_algorithm:
|
||||
The name of the compression algorithm (e.g., 'gzip'). The algorithm is
|
||||
needed if the remote metadata file is compressed.
|
||||
|
||||
<Exceptions>
|
||||
tuf.exceptions.NoWorkingMirrorError:
|
||||
The metadata could not be fetched. This is raised only when all known
|
||||
|
|
@ -1068,19 +1058,29 @@ def _get_metadata_file(self, metadata_role, remote_filename,
|
|||
file_object = tuf.download.unsafe_download(file_mirror,
|
||||
upperbound_filelength)
|
||||
|
||||
if compression_algorithm is not None:
|
||||
logger.info('Decompressing ' + str(file_mirror))
|
||||
file_object.decompress_temp_file_object(compression_algorithm)
|
||||
|
||||
else:
|
||||
logger.info('Not decompressing ' + str(file_mirror))
|
||||
|
||||
# Verify 'file_object' according to the callable function.
|
||||
# 'file_object' is also verified if decompressed above (i.e., the
|
||||
# uncompressed version).
|
||||
metadata_signable = \
|
||||
securesystemslib.util.load_json_string(file_object.read().decode('utf-8'))
|
||||
|
||||
# Determine if the specification version number is supported. It is
|
||||
# assumed that "spec_version" is in (major.minor.fix) format, (for
|
||||
# example: "1.4.3") and that releases with the same major version
|
||||
# number maintain backwards compatibility. Consequently, if the major
|
||||
# version number of new metadata equals our expected major version
|
||||
# number, the new metadata is safe to parse.
|
||||
try:
|
||||
spec_version_parsed = metadata_signable['signed']['spec_version'].split('.')
|
||||
if int(spec_version_parsed[0]) != SUPPORTED_MAJOR_VERSION:
|
||||
raise securesystemslib.exceptions.BadVersionNumberError('Downloaded'
|
||||
' metadata that specifies an unsupported spec_version. Supported'
|
||||
' major version number: ' + repr(SUPPORTED_MAJOR_VERSION))
|
||||
|
||||
except (ValueError, TypeError):
|
||||
raise securesystemslib.excep4tions.FormatError('Improperly'
|
||||
' formatted spec_version, which must be in major.minor.fix format')
|
||||
|
||||
# If the version number is unspecified, ensure that the version number
|
||||
# downloaded is greater than the currently trusted version number for
|
||||
# 'metadata_role'.
|
||||
|
|
@ -1153,8 +1153,7 @@ def _verify_root_chain_link(self, rolename, current, next):
|
|||
|
||||
|
||||
def _get_file(self, filepath, verify_file_function, file_type,
|
||||
file_length, compression=None,
|
||||
verify_compressed_file_function=None, download_safely=True):
|
||||
file_length, download_safely=True):
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that tries downloading, up to a certain length, a
|
||||
|
|
@ -1181,15 +1180,6 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
The expected length, or upper bound, of the target or metadata file to
|
||||
be downloaded.
|
||||
|
||||
compression:
|
||||
The name of the compression algorithm (e.g., 'gzip'), if the metadata
|
||||
file is compressed.
|
||||
|
||||
verify_compressed_file_function:
|
||||
If compression is specified, in the case of metadata files, this
|
||||
callable function may be set to perform verification of the compressed
|
||||
version of the metadata file. Decompressed metadata is also verified.
|
||||
|
||||
download_safely:
|
||||
A boolean switch to toggle safe or unsafe download of the file.
|
||||
|
||||
|
|
@ -1227,15 +1217,6 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
file_object = tuf.download.unsafe_download(file_mirror,
|
||||
file_length)
|
||||
|
||||
if compression is not None:
|
||||
if verify_compressed_file_function is not None:
|
||||
verify_compressed_file_function(file_object)
|
||||
logger.info('Decompressing ' + str(file_mirror))
|
||||
file_object.decompress_temp_file_object(compression)
|
||||
|
||||
else:
|
||||
logger.info('Not decompressing ' + str(file_mirror))
|
||||
|
||||
# Verify 'file_object' according to the callable function.
|
||||
# 'file_object' is also verified if decompressed above (i.e., the
|
||||
# uncompressed version).
|
||||
|
|
@ -1262,8 +1243,7 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
|
||||
|
||||
|
||||
def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
||||
compression_algorithm=None):
|
||||
def _update_metadata(self, metadata_role, upperbound_filelength, version=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that downloads, verifies, and 'installs' the metadata
|
||||
|
|
@ -1285,12 +1265,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
The expected and required version number of the 'metadata_role' file
|
||||
downloaded. 'expected_version' is an integer.
|
||||
|
||||
compression_algorithm:
|
||||
A string designating the compression type of 'metadata_role'.
|
||||
The 'snapshot' metadata file may be optionally downloaded and stored in
|
||||
compressed form. Currently, only metadata files compressed with 'gzip'
|
||||
are considered. Any other string is ignored.
|
||||
|
||||
<Exceptions>
|
||||
tuf.exceptions.NoWorkingMirrorError:
|
||||
The metadata cannot be updated. This is not specific to a single
|
||||
|
|
@ -1308,12 +1282,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
|
||||
# Construct the metadata filename as expected by the download/mirror modules.
|
||||
metadata_filename = metadata_role + '.json'
|
||||
uncompressed_metadata_filename = metadata_filename
|
||||
|
||||
# The 'snapshot' or Targets metadata may be compressed. Add the appropriate
|
||||
# extension to 'metadata_filename'.
|
||||
if compression_algorithm == 'gzip':
|
||||
metadata_filename = metadata_filename + '.gz'
|
||||
metadata_filename = metadata_filename
|
||||
|
||||
# Attempt a file download from each mirror until the file is downloaded and
|
||||
# verified. If the signature of the downloaded file is valid, proceed,
|
||||
|
|
@ -1330,10 +1299,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
# for each other. In this case, we will download the metadata up to the
|
||||
# best length we can get for it, not request a specific version, but
|
||||
# perform the rest of the checks (e.g., signature verification).
|
||||
#
|
||||
# Note also that we presently support decompression of only "safe"
|
||||
# metadata, but this is easily extend to "unsafe" metadata as well as
|
||||
# "safe" targets.
|
||||
|
||||
remote_filename = metadata_filename
|
||||
filename_version = ''
|
||||
|
|
@ -1345,8 +1310,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
|
||||
metadata_file_object = \
|
||||
self._get_metadata_file(metadata_role, remote_filename,
|
||||
upperbound_filelength, version,
|
||||
compression_algorithm)
|
||||
upperbound_filelength, version)
|
||||
|
||||
# The metadata has been verified. Move the metadata file into place.
|
||||
# First, move the 'current' metadata file to the 'previous' directory
|
||||
|
|
@ -1371,16 +1335,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
metadata_signable = \
|
||||
securesystemslib.util.load_json_string(metadata_file_object.read().decode('utf-8'))
|
||||
|
||||
if compression_algorithm == 'gzip':
|
||||
current_uncompressed_filepath = \
|
||||
os.path.join(self.metadata_directory['current'],
|
||||
uncompressed_metadata_filename)
|
||||
current_uncompressed_filepath = \
|
||||
os.path.abspath(current_uncompressed_filepath)
|
||||
metadata_file_object.move(current_uncompressed_filepath)
|
||||
|
||||
else:
|
||||
metadata_file_object.move(current_filepath)
|
||||
metadata_file_object.move(current_filepath)
|
||||
|
||||
# Extract the metadata object so we can store it to the metadata store.
|
||||
# 'current_metadata_object' set to 'None' if there is not an object
|
||||
|
|
@ -1398,7 +1353,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
logger.debug('Updated ' + repr(current_filepath) + '.')
|
||||
self.metadata['previous'][metadata_role] = current_metadata_object
|
||||
self.metadata['current'][metadata_role] = updated_metadata_object
|
||||
self._update_versioninfo(uncompressed_metadata_filename)
|
||||
self._update_versioninfo(metadata_filename)
|
||||
|
||||
# Ensure the role and key information of the top-level roles is also updated
|
||||
# according to the newly-installed Root metadata.
|
||||
|
|
@ -1469,7 +1424,7 @@ def _update_metadata_if_changed(self, metadata_role,
|
|||
None.
|
||||
"""
|
||||
|
||||
uncompressed_metadata_filename = metadata_role + '.json'
|
||||
metadata_filename = metadata_role + '.json'
|
||||
expected_versioninfo = None
|
||||
expected_fileinfo = None
|
||||
|
||||
|
|
@ -1493,57 +1448,26 @@ def _update_metadata_if_changed(self, metadata_role,
|
|||
# strictly greater than its currently trusted version number.
|
||||
expected_versioninfo = self.metadata['current'][referenced_metadata] \
|
||||
['meta'] \
|
||||
[uncompressed_metadata_filename]
|
||||
[metadata_filename]
|
||||
|
||||
if not self._versioninfo_has_been_updated(uncompressed_metadata_filename,
|
||||
if not self._versioninfo_has_been_updated(metadata_filename,
|
||||
expected_versioninfo):
|
||||
logger.info(repr(uncompressed_metadata_filename) + ' up-to-date.')
|
||||
logger.info(repr(metadata_filename) + ' up-to-date.')
|
||||
|
||||
# Since we have not downloaded a new version of this metadata, we
|
||||
# should check to see if our local version is stale and notify the user
|
||||
# if so. This raises tuf.exceptions.ExpiredMetadataError if the metadata we
|
||||
# have is expired. Resolves issue #322.
|
||||
# Since we have not downloaded a new version of this metadata, we should
|
||||
# check to see if our local version is stale and notify the user if so.
|
||||
# This raises tuf.exceptions.ExpiredMetadataError if the metadata we have
|
||||
# is expired. Resolves issue #322.
|
||||
self._ensure_not_expired(self.metadata['current'][metadata_role],
|
||||
metadata_role)
|
||||
|
||||
# TODO: If 'metadata_role' is root or snapshot, we should verify that
|
||||
# root's hash matches what's in snapshot, and that snapshot hash matches
|
||||
# what's listed in timestamp.json.
|
||||
|
||||
return
|
||||
|
||||
logger.debug('Metadata ' + repr(uncompressed_metadata_filename) + ' has changed.')
|
||||
|
||||
# There might be a compressed version of 'snapshot.json' or Targets
|
||||
# metadata available for download. Check the 'meta' field of
|
||||
# 'referenced_metadata' to see if it is listed when 'metadata_role'
|
||||
# is 'snapshot'. The full rolename for delegated Targets metadata
|
||||
# must begin with 'targets/'. The snapshot role lists all the Targets
|
||||
# metadata available on the repository, including any that may be in
|
||||
# compressed form.
|
||||
#
|
||||
# In addition to validating the fileinfo (i.e., file lengths and hashes)
|
||||
# of the uncompressed metadata, the compressed version is also verified to
|
||||
# match its respective fileinfo. Verifying the compressed fileinfo ensures
|
||||
# untrusted data is not decompressed prior to verifying hashes, or
|
||||
# decompressing a file that may be invalid or partially intact.
|
||||
compression = None
|
||||
|
||||
# Check for the availability of compressed versions of 'snapshot.json',
|
||||
# 'targets.json', and delegated Targets (that also start with 'targets').
|
||||
# For 'targets.json' and delegated metadata, 'referenced_metata'
|
||||
# should always be 'snapshot'. 'snapshot.json' specifies all roles
|
||||
# provided by a repository, including their version numbers.
|
||||
if metadata_role == 'snapshot' or metadata_role.startswith('targets'):
|
||||
if 'gz' in self.metadata['current']['root']['compression_algorithms']:
|
||||
compression = 'gzip'
|
||||
gzip_metadata_filename = uncompressed_metadata_filename + '.gz'
|
||||
logger.debug('Compressed version of ' +
|
||||
repr(uncompressed_metadata_filename) + ' is available at ' +
|
||||
repr(gzip_metadata_filename) + '.')
|
||||
|
||||
else:
|
||||
logger.debug('Compressed version of ' +
|
||||
repr(uncompressed_metadata_filename) + ' not available.')
|
||||
logger.debug('Metadata ' + repr(metadata_filename) + ' has changed.')
|
||||
|
||||
# The file lengths of metadata are unknown, only their version numbers are
|
||||
# known. Set an upper limit for the length of the downloaded file for each
|
||||
|
|
@ -1558,7 +1482,7 @@ def _update_metadata_if_changed(self, metadata_role,
|
|||
|
||||
try:
|
||||
self._update_metadata(metadata_role, upperbound_filelength,
|
||||
expected_versioninfo['version'], compression)
|
||||
expected_versioninfo['version'])
|
||||
|
||||
except:
|
||||
# The current metadata we have is not current but we couldn't get new
|
||||
|
|
|
|||
|
|
@ -134,9 +134,6 @@
|
|||
# The full list of supported TUF metadata extensions.
|
||||
from tuf.repository_lib import METADATA_EXTENSIONS
|
||||
|
||||
# The recognized compression extensions.
|
||||
from tuf.repository_lib import SUPPORTED_COMPRESSION_EXTENSIONS
|
||||
|
||||
# Supported key types.
|
||||
from tuf.repository_lib import SUPPORTED_KEY_TYPES
|
||||
|
||||
|
|
@ -525,9 +522,8 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
|
|||
|
||||
if tuf.sig.verify(signable, rolename, repository_name) or write_partial:
|
||||
_remove_invalid_and_duplicate_signatures(signable, repository_name)
|
||||
compressions = roleinfo['compressions']
|
||||
filename = write_metadata_file(signable, metadata_filename,
|
||||
metadata['version'], compressions, False)
|
||||
metadata['version'], False)
|
||||
|
||||
# 'signable' contains an invalid threshold of signatures.
|
||||
else:
|
||||
|
|
@ -912,7 +908,7 @@ def load_project(project_directory, prefix='', new_targets_location=None,
|
|||
for role in targets_metadata['delegations']['roles']:
|
||||
rolename = role['name']
|
||||
roleinfo = {'name': role['name'], 'keyids': role['keyids'],
|
||||
'threshold': role['threshold'], 'compressions': [''],
|
||||
'threshold': role['threshold'],
|
||||
'signing_keyids': [], 'signatures': [], 'partial_loaded':False,
|
||||
'delegations': {'keys':{}, 'roles':[]}
|
||||
}
|
||||
|
|
@ -967,9 +963,6 @@ def load_project(project_directory, prefix='', new_targets_location=None,
|
|||
roleinfo['delegations'] = metadata_object['delegations']
|
||||
roleinfo['partial_loaded'] = False
|
||||
|
||||
if os.path.exists(metadata_path+'.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
# If the metadata was partially loaded, update the roleinfo flag.
|
||||
if _metadata_is_partially_loaded(metadata_name, signable, roleinfo,
|
||||
repository_name=repository_name):
|
||||
|
|
@ -1003,8 +996,7 @@ def load_project(project_directory, prefix='', new_targets_location=None,
|
|||
rolename = role['name']
|
||||
roleinfo = {'name': role['name'], 'keyids': role['keyids'],
|
||||
'threshold': role['threshold'],
|
||||
'compressions': [''], 'signing_keyids': [],
|
||||
'signatures': [],
|
||||
'signing_keyids': [], 'signatures': [],
|
||||
'partial_loaded': False,
|
||||
'delegations': {'keys': {},
|
||||
'roles': []}}
|
||||
|
|
|
|||
|
|
@ -149,18 +149,6 @@ class UnsupportedLibraryError(Error):
|
|||
pass
|
||||
|
||||
|
||||
class DecompressionError(Error):
|
||||
"""Indicate that some error happened while decompressing a file."""
|
||||
|
||||
def __init__(self, exception):
|
||||
# Store the original exception.
|
||||
self.exception = exception
|
||||
|
||||
def __str__(self):
|
||||
# Show the original exception.
|
||||
return repr(self.exception)
|
||||
|
||||
|
||||
class DownloadError(Error):
|
||||
"""Indicate an error occurred while attempting to download a file."""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -85,6 +85,13 @@
|
|||
|
||||
import six
|
||||
|
||||
|
||||
# TUF specification version. The constant should be updated when the version
|
||||
# number of the specification changes. All metadata should list this version
|
||||
# number.
|
||||
TUF_VERSION_NUMBER = '1.0'
|
||||
SPECIFICATION_VERSION_SCHEMA = SCHEMA.AnyString()
|
||||
|
||||
# A datetime in 'YYYY-MM-DDTHH:MM:SSZ' ISO 8601 format. The "Z" zone designator
|
||||
# for the zero UTC offset is always used (i.e., a numerical offset is not
|
||||
# supported.) Example: '2015-10-21T13:20:00Z'. Note: This is a simple format
|
||||
|
|
@ -147,10 +154,6 @@
|
|||
# A value that is either True or False, on or off, etc.
|
||||
BOOLEAN_SCHEMA = SCHEMA.Boolean()
|
||||
|
||||
# List of supported compression extensions.
|
||||
COMPRESSIONS_SCHEMA = SCHEMA.ListOf(
|
||||
SCHEMA.OneOf([SCHEMA.String(''), SCHEMA.String('gz')]))
|
||||
|
||||
# A string representing a role's name.
|
||||
ROLENAME_SCHEMA = SCHEMA.AnyString()
|
||||
|
||||
|
|
@ -268,13 +271,6 @@
|
|||
# as requiring them to be a power of 2.
|
||||
NUMBINS_SCHEMA = SCHEMA.Integer(lo=1)
|
||||
|
||||
# Supported compression extension (e.g., 'gz').
|
||||
COMPRESSION_SCHEMA = SCHEMA.OneOf([SCHEMA.String(''), SCHEMA.String('gz')])
|
||||
|
||||
# List of supported compression extensions.
|
||||
COMPRESSIONS_SCHEMA = SCHEMA.ListOf(
|
||||
SCHEMA.OneOf([SCHEMA.String(''), SCHEMA.String('gz')]))
|
||||
|
||||
# The fileinfo format of targets specified in the repository and
|
||||
# developer tools. The second element of this list holds custom data about the
|
||||
# target, such as file permissions, author(s), last modified, etc.
|
||||
|
|
@ -295,7 +291,6 @@
|
|||
version = SCHEMA.Optional(METADATAVERSION_SCHEMA),
|
||||
expires = SCHEMA.Optional(ISO8601_DATETIME_SCHEMA),
|
||||
signatures = SCHEMA.Optional(securesystemslib.formats.SIGNATURES_SCHEMA),
|
||||
compressions = SCHEMA.Optional(COMPRESSIONS_SCHEMA),
|
||||
paths = SCHEMA.Optional(SCHEMA.OneOf([RELPATHS_SCHEMA, PATH_FILEINFO_SCHEMA])),
|
||||
path_hash_prefixes = SCHEMA.Optional(PATH_HASH_PREFIXES_SCHEMA),
|
||||
delegations = SCHEMA.Optional(DELEGATIONS_SCHEMA),
|
||||
|
|
@ -311,9 +306,9 @@
|
|||
ROOT_SCHEMA = SCHEMA.Object(
|
||||
object_name = 'ROOT_SCHEMA',
|
||||
_type = SCHEMA.String('root'),
|
||||
spec_version = SPECIFICATION_VERSION_SCHEMA,
|
||||
version = METADATAVERSION_SCHEMA,
|
||||
consistent_snapshot = BOOLEAN_SCHEMA,
|
||||
compression_algorithms = COMPRESSIONS_SCHEMA,
|
||||
expires = ISO8601_DATETIME_SCHEMA,
|
||||
keys = KEYDICT_SCHEMA,
|
||||
roles = ROLEDICT_SCHEMA)
|
||||
|
|
@ -322,6 +317,7 @@
|
|||
TARGETS_SCHEMA = SCHEMA.Object(
|
||||
object_name = 'TARGETS_SCHEMA',
|
||||
_type = SCHEMA.String('targets'),
|
||||
spec_version = SPECIFICATION_VERSION_SCHEMA,
|
||||
version = METADATAVERSION_SCHEMA,
|
||||
expires = ISO8601_DATETIME_SCHEMA,
|
||||
targets = FILEDICT_SCHEMA,
|
||||
|
|
@ -334,6 +330,7 @@
|
|||
_type = SCHEMA.String('snapshot'),
|
||||
version = securesystemslib.formats.METADATAVERSION_SCHEMA,
|
||||
expires = securesystemslib.formats.ISO8601_DATETIME_SCHEMA,
|
||||
spec_version = SPECIFICATION_VERSION_SCHEMA,
|
||||
meta = FILEINFODICT_SCHEMA)
|
||||
|
||||
# Timestamp role: indicates the latest version of the snapshot file.
|
||||
|
|
@ -458,7 +455,6 @@ def make_signable(object):
|
|||
|
||||
|
||||
|
||||
|
||||
class MetaFile(object):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -519,6 +515,7 @@ def from_metadata(object):
|
|||
@staticmethod
|
||||
def make_metadata(version, expiration_date, filedict):
|
||||
result = {'_type' : 'timestamp'}
|
||||
result['spec_version'] = TUF_VERSION_NUMBER
|
||||
result['version'] = version
|
||||
result['expires'] = expiration_date
|
||||
result['meta'] = filedict
|
||||
|
|
@ -532,16 +529,13 @@ def make_metadata(version, expiration_date, filedict):
|
|||
|
||||
|
||||
class RootFile(MetaFile):
|
||||
def __init__(self, version, expires, keys, roles, consistent_snapshot,
|
||||
compression_algorithms):
|
||||
def __init__(self, version, expires, keys, roles, consistent_snapshot):
|
||||
self.info = {}
|
||||
self.info['version'] = version
|
||||
self.info['expires'] = expires
|
||||
self.info['keys'] = keys
|
||||
self.info['roles'] = roles
|
||||
self.info['consistent_snapshot'] = consistent_snapshot
|
||||
self.info['compression_algorithms'] = compression_algorithms
|
||||
|
||||
|
||||
@staticmethod
|
||||
def from_metadata(object):
|
||||
|
|
@ -554,22 +548,19 @@ def from_metadata(object):
|
|||
keys = object['keys']
|
||||
roles = object['roles']
|
||||
consistent_snapshot = object['consistent_snapshot']
|
||||
compression_algorithms = object['compression_algorithms']
|
||||
|
||||
return RootFile(version, expires, keys, roles, consistent_snapshot,
|
||||
compression_algorithms)
|
||||
return RootFile(version, expires, keys, roles, consistent_snapshot)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def make_metadata(version, expiration_date, keydict, roledict,
|
||||
consistent_snapshot, compression_algorithms):
|
||||
def make_metadata(version, expiration_date, keydict, roledict, consistent_snapshot):
|
||||
result = {'_type' : 'root'}
|
||||
result['spec_version'] = TUF_VERSION_NUMBER
|
||||
result['version'] = version
|
||||
result['expires'] = expiration_date
|
||||
result['keys'] = keydict
|
||||
result['roles'] = roledict
|
||||
result['consistent_snapshot'] = consistent_snapshot
|
||||
result['compression_algorithms'] = compression_algorithms
|
||||
|
||||
# Is 'result' a Root metadata file?
|
||||
# Raise 'securesystemslib.exceptions.FormatError' if not.
|
||||
|
|
@ -604,6 +595,7 @@ def from_metadata(object):
|
|||
@staticmethod
|
||||
def make_metadata(version, expiration_date, versiondict):
|
||||
result = {'_type' : 'snapshot'}
|
||||
result['spec_version'] = TUF_VERSION_NUMBER
|
||||
result['version'] = version
|
||||
result['expires'] = expiration_date
|
||||
result['meta'] = versiondict
|
||||
|
|
@ -651,9 +643,11 @@ def make_metadata(version, expiration_date, filedict=None, delegations=None):
|
|||
' empty targets metadata.')
|
||||
|
||||
result = {'_type' : 'targets'}
|
||||
result['spec_version'] = TUF_VERSION_NUMBER
|
||||
result['version'] = version
|
||||
result['expires'] = expiration_date
|
||||
result['targets'] = {}
|
||||
|
||||
if filedict is not None:
|
||||
result['targets'] = filedict
|
||||
if delegations is not None:
|
||||
|
|
|
|||
|
|
@ -96,9 +96,6 @@
|
|||
# Supported key types.
|
||||
SUPPORTED_KEY_TYPES = ['rsa', 'ed25519']
|
||||
|
||||
# The recognized compression extensions.
|
||||
SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz']
|
||||
|
||||
# The full list of supported TUF metadata extensions.
|
||||
METADATA_EXTENSIONS = ['.json.gz', '.json']
|
||||
|
||||
|
|
@ -107,12 +104,9 @@
|
|||
|
||||
|
||||
def _generate_and_write_metadata(rolename, metadata_filename,
|
||||
targets_directory, metadata_directory,
|
||||
consistent_snapshot=False, filenames=None,
|
||||
compression_algorithms=['gz'],
|
||||
allow_partially_signed=False,
|
||||
increment_version_number=True,
|
||||
repository_name='default'):
|
||||
targets_directory, metadata_directory, consistent_snapshot=False,
|
||||
filenames=None, allow_partially_signed=False, increment_version_number=True,
|
||||
repository_name='default'):
|
||||
"""
|
||||
Non-public function that can generate and write the metadata for the
|
||||
specified 'rolename'. It also increments the version number of 'rolename' if
|
||||
|
|
@ -131,7 +125,7 @@ def _generate_and_write_metadata(rolename, metadata_filename,
|
|||
# Generate the appropriate role metadata for 'rolename'.
|
||||
if rolename == 'root':
|
||||
metadata = generate_root_metadata(roleinfo['version'], roleinfo['expires'],
|
||||
consistent_snapshot, compression_algorithms, repository_name)
|
||||
consistent_snapshot, repository_name)
|
||||
|
||||
_log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'],
|
||||
ROOT_EXPIRES_WARN_SECONDS)
|
||||
|
|
@ -221,7 +215,7 @@ def should_write():
|
|||
if rolename == 'root':
|
||||
consistent_snapshot = True
|
||||
filename = write_metadata_file(signable, metadata_filename,
|
||||
metadata['version'], compression_algorithms, consistent_snapshot)
|
||||
metadata['version'], consistent_snapshot)
|
||||
|
||||
# 'signable' contains an invalid threshold of signatures.
|
||||
else:
|
||||
|
|
@ -247,13 +241,11 @@ def should_write():
|
|||
# <version>.root.json and root.json).
|
||||
if rolename == 'root':
|
||||
filename = write_metadata_file(signable, metadata_filename,
|
||||
metadata['version'], compression_algorithms,
|
||||
consistent_snapshot=True)
|
||||
metadata['version'], consistent_snapshot=True)
|
||||
|
||||
else:
|
||||
filename = write_metadata_file(signable, metadata_filename,
|
||||
metadata['version'], compression_algorithms,
|
||||
consistent_snapshot)
|
||||
metadata['version'], consistent_snapshot)
|
||||
|
||||
return signable, filename
|
||||
|
||||
|
|
@ -580,9 +572,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
logger.debug('Found a Root signature that is already loaded:'
|
||||
' ' + repr(signature))
|
||||
|
||||
if os.path.exists(root_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('A compressed Root file was not found.')
|
||||
|
||||
|
|
@ -620,11 +609,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
roleinfo = tuf.roledb.get_roleinfo('timestamp', repository_name)
|
||||
roleinfo['expires'] = timestamp_metadata['expires']
|
||||
roleinfo['version'] = timestamp_metadata['version']
|
||||
if os.path.exists(timestamp_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('A compressed Timestamp file was not found.')
|
||||
|
||||
if _metadata_is_partially_loaded('timestamp', signable, roleinfo, repository_name):
|
||||
roleinfo['partial_loaded'] = True
|
||||
|
|
@ -665,11 +649,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
roleinfo = tuf.roledb.get_roleinfo('snapshot', repository_name)
|
||||
roleinfo['expires'] = snapshot_metadata['expires']
|
||||
roleinfo['version'] = snapshot_metadata['version']
|
||||
if os.path.exists(snapshot_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('A compressed Snapshot file was not loaded.')
|
||||
|
||||
if _metadata_is_partially_loaded('snapshot', signable, roleinfo, repository_name):
|
||||
roleinfo['partial_loaded'] = True
|
||||
|
|
@ -708,11 +687,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
|
|||
roleinfo['version'] = targets_metadata['version']
|
||||
roleinfo['expires'] = targets_metadata['expires']
|
||||
roleinfo['delegations'] = targets_metadata['delegations']
|
||||
if os.path.exists(targets_filename + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('Compressed Targets file cannot be loaded.')
|
||||
|
||||
if _metadata_is_partially_loaded('targets', signable, roleinfo, repository_name):
|
||||
roleinfo['partial_loaded'] = True
|
||||
|
|
@ -1226,7 +1200,7 @@ def get_target_hash(target_filepath):
|
|||
|
||||
|
||||
def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
||||
compression_algorithms=['gz'], repository_name='default'):
|
||||
repository_name='default'):
|
||||
"""
|
||||
<Purpose>
|
||||
Create the root metadata. 'tuf.roledb.py' and 'tuf.keydb.py'
|
||||
|
|
@ -1248,11 +1222,6 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
|||
filename of any target file located in the targets directory. Each digest
|
||||
is stripped from the target filename and listed in the snapshot metadata.
|
||||
|
||||
compression_algorithms:
|
||||
A list of compression algorithms to use when generating the compressed
|
||||
metadata files for the repository. The root file specifies the
|
||||
algorithms used by the repository.
|
||||
|
||||
repository_name:
|
||||
The name of the repository. If not supplied, 'rolename' is added to the
|
||||
'default' repository.
|
||||
|
|
@ -1280,7 +1249,6 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
|||
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
|
||||
securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
|
||||
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
|
||||
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
|
||||
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
|
||||
|
||||
# The role and key dictionaries to be saved in the root metadata object.
|
||||
|
|
@ -1343,9 +1311,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
|
|||
|
||||
# Generate the root metadata object.
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version, expiration_date,
|
||||
keydict, roledict,
|
||||
consistent_snapshot,
|
||||
compression_algorithms)
|
||||
keydict, roledict, consistent_snapshot)
|
||||
|
||||
return root_metadata
|
||||
|
||||
|
|
@ -1754,16 +1720,10 @@ def sign_metadata(metadata_object, keyids, filename, repository_name):
|
|||
|
||||
|
||||
|
||||
def write_metadata_file(metadata, filename, version_number,
|
||||
compression_algorithms, consistent_snapshot):
|
||||
def write_metadata_file(metadata, filename, version_number, consistent_snapshot):
|
||||
"""
|
||||
<Purpose>
|
||||
If necessary, write the 'metadata' signable object to 'filename', and the
|
||||
compressed version of the metadata file if 'compression' is set.
|
||||
|
||||
Note: Compression algorithms like gzip attach a timestamp to compressed
|
||||
files, so a metadata file compressed multiple times may generate different
|
||||
digests even though the uncompressed content has not changed.
|
||||
If necessary, write the 'metadata' signable object to 'filename'.
|
||||
|
||||
<Arguments>
|
||||
metadata:
|
||||
|
|
@ -1772,27 +1732,22 @@ def write_metadata_file(metadata, filename, version_number,
|
|||
|
||||
filename:
|
||||
The filename of the metadata to be written (e.g., 'root.json').
|
||||
If a compression algorithm is specified in 'compression_algorithms', the
|
||||
compression extention is appended to 'filename'.
|
||||
|
||||
version_number:
|
||||
The version number of the metadata file to be written. The version
|
||||
number is needed for consistent snapshots, which prepend the version
|
||||
number to 'filename'.
|
||||
|
||||
compression_algorithms:
|
||||
Specify the algorithms, as a list of strings, used to compress the
|
||||
'metadata'; The only currently available compression option is 'gz'
|
||||
(gzip).
|
||||
|
||||
consistent_snapshot:
|
||||
Boolean that determines whether the metadata file's digest should be
|
||||
prepended to the filename.
|
||||
|
||||
<Exceptions>
|
||||
securesystemslib.exceptions.FormatError, if the arguments are improperly formatted.
|
||||
securesystemslib.exceptions.FormatError, if the arguments are improperly
|
||||
formatted.
|
||||
|
||||
securesystemslib.exceptions.Error, if the directory of 'filename' does not exist.
|
||||
securesystemslib.exceptions.Error, if the directory of 'filename' does not
|
||||
exist.
|
||||
|
||||
Any other runtime (e.g., IO) exception.
|
||||
|
||||
|
|
@ -1811,7 +1766,6 @@ def write_metadata_file(metadata, filename, version_number,
|
|||
tuf.formats.SIGNABLE_SCHEMA.check_match(metadata)
|
||||
securesystemslib.formats.PATH_SCHEMA.check_match(filename)
|
||||
tuf.formats.METADATAVERSION_SCHEMA.check_match(version_number)
|
||||
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
|
||||
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
|
||||
|
||||
# Verify the directory of 'filename', and convert 'filename' to its absolute
|
||||
|
|
@ -1878,115 +1832,21 @@ def write_metadata_file(metadata, filename, version_number,
|
|||
os.link(written_consistent_filename, written_filename)
|
||||
|
||||
else:
|
||||
raise securesystemslib.exceptions.InvalidConfigurationError('The consistent method specified'
|
||||
' in tuf.settings.py is not supported, try either "copy" or "hard_link"')
|
||||
raise securesystemslib.exceptions.InvalidConfigurationError('The'
|
||||
' consistent method specified in tuf.settings.py is not supported, try'
|
||||
' either "copy" or "hard_link"')
|
||||
|
||||
else:
|
||||
logger.debug('Not creating a consistent snapshot for ' + repr(written_filename))
|
||||
logger.debug('Saving ' + repr(written_filename))
|
||||
file_object.move(written_filename)
|
||||
|
||||
# Generate the compressed versions of 'metadata', if necessary. A compressed
|
||||
# file may be written (without needing to write the uncompressed version) if
|
||||
# the repository maintainer adds compression after writing the uncompressed
|
||||
# version.
|
||||
for compression_algorithm in compression_algorithms:
|
||||
file_object = None
|
||||
|
||||
# Ignore the empty string that signifies non-compression. The uncompressed
|
||||
# file was previously written above, if necessary.
|
||||
if not len(compression_algorithm):
|
||||
continue
|
||||
|
||||
elif compression_algorithm == 'gz':
|
||||
file_object = securesystemslib.util.TempFile()
|
||||
compressed_filename = filename + '.gz'
|
||||
|
||||
# Instantiate a gzip object, but save compressed content to
|
||||
# 'file_object' (i.e., GzipFile instance is based on its 'fileobj'
|
||||
# argument).
|
||||
gzip_object = gzip.GzipFile(fileobj=file_object, mode='wb')
|
||||
try:
|
||||
gzip_object.write(file_content)
|
||||
|
||||
finally:
|
||||
gzip_object.close()
|
||||
|
||||
# This else clause should not be reached because the
|
||||
# 'compression_algorithms' list is validated against the
|
||||
# COMPRESSIONS_SCHEMA above.
|
||||
else: # pragma: no cover
|
||||
raise securesystemslib.exceptions.FormatError('Unknown compression algorithm:'
|
||||
' ' + repr(compression_algorithm))
|
||||
|
||||
# Save the compressed version, ensuring an unchanged file is not re-saved.
|
||||
# Re-saving the same compressed version may cause its digest to
|
||||
# unexpectedly change (gzip includes a timestamp) even though content has
|
||||
# not changed.
|
||||
_write_compressed_metadata(file_object, compressed_filename,
|
||||
True, consistent_snapshot,
|
||||
version_number)
|
||||
return written_filename
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _write_compressed_metadata(file_object, compressed_filename,
|
||||
write_new_metadata, consistent_snapshot, version_number):
|
||||
"""
|
||||
Write compressed versions of metadata, ensuring compressed file that have
|
||||
not changed are not re-written, the digest of the compressed file is properly
|
||||
added to the compressed filename, and consistent snapshots are also saved.
|
||||
Ensure compressed files are written to a temporary location, and then
|
||||
moved to their destinations.
|
||||
"""
|
||||
|
||||
# If a consistent snapshot is unneeded, 'file_object' may be simply moved
|
||||
# 'compressed_filename' if not already written.
|
||||
if not consistent_snapshot:
|
||||
if write_new_metadata or not os.path.exists(compressed_filename):
|
||||
file_object.move(compressed_filename)
|
||||
|
||||
# The temporary file must be closed if 'file_object.move()' is not used.
|
||||
# securesystemslib.util.TempFile() automatically closes the temp file when move() is
|
||||
# called
|
||||
else:
|
||||
file_object.close_temp_file()
|
||||
|
||||
# consistent snapshots = True. Ensure the version number is included in the
|
||||
# compressed filename written, provided it does not already exist.
|
||||
else:
|
||||
compressed_content = file_object.read()
|
||||
consistent_filename = None
|
||||
version_and_filename = None
|
||||
|
||||
# Attach the version number to the compressed, consistent snapshot filename.
|
||||
dirname, basename = os.path.split(compressed_filename)
|
||||
|
||||
for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS:
|
||||
if basename.endswith(compression_extension):
|
||||
basename = basename.split(compression_extension, 1)[0]
|
||||
version_and_filename = str(version_number) + '.' + basename + compression_extension
|
||||
consistent_filename = os.path.join(dirname, version_and_filename)
|
||||
|
||||
else:
|
||||
logger.debug('Skipping compression extension: ' + repr(compression_extension))
|
||||
|
||||
# Move the 'securesystemslib.util.TempFile' object to one of the filenames so that it is
|
||||
# saved and the temporary file closed.
|
||||
if not os.path.exists(consistent_filename):
|
||||
logger.debug('Saving ' + repr(consistent_filename))
|
||||
file_object.move(consistent_filename)
|
||||
|
||||
else:
|
||||
logger.debug('Skipping already written compressed file:'
|
||||
' ' + repr(consistent_filename))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _log_status_of_top_level_roles(targets_directory, metadata_directory,
|
||||
repository_name):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -186,7 +186,7 @@ def __init__(self, repository_directory, metadata_directory,
|
|||
|
||||
|
||||
|
||||
def writeall(self, consistent_snapshot=False, compression_algorithms=['gz']):
|
||||
def writeall(self, consistent_snapshot=False):
|
||||
"""
|
||||
<Purpose>
|
||||
Write all the JSON Metadata objects to their corresponding files.
|
||||
|
|
@ -202,11 +202,6 @@ def writeall(self, consistent_snapshot=False, compression_algorithms=['gz']):
|
|||
<version_number>.README.json
|
||||
Example: 13.root.json'
|
||||
|
||||
compression_algorithms:
|
||||
A list of compression algorithms. Each of these algorithms will be
|
||||
used to compress all of the metadata available on the repository.
|
||||
By default, all metadata is compressed with gzip.
|
||||
|
||||
<Exceptions>
|
||||
tuf.exceptions.UnsignedMetadataError, if any of the top-level
|
||||
and delegated roles do not have the minimum threshold of signatures.
|
||||
|
|
@ -224,7 +219,6 @@ def writeall(self, consistent_snapshot=False, compression_algorithms=['gz']):
|
|||
# 'securesystemslib.exceptions.FormatError' if any are improperly
|
||||
# formatted.
|
||||
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
|
||||
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
|
||||
|
||||
# At this point, tuf.keydb and tuf.roledb must be fully populated,
|
||||
# otherwise writeall() throws a 'tuf.exceptions.UnsignedMetadataError' for
|
||||
|
|
@ -557,8 +551,8 @@ class Metadata(object):
|
|||
top-level roles: Root, Targets, Snapshot, and Timestamp. The Metadata
|
||||
class provides methods that are needed by all top-level roles, such as
|
||||
adding and removing public keys, private keys, and signatures. Metadata
|
||||
attributes, such as rolename, version, threshold, expiration, key list, and
|
||||
compressions, is also provided by the Metadata base class.
|
||||
attributes, such as rolename, version, threshold, expiration, and key list
|
||||
are also provided by the Metadata base class.
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
|
@ -1325,87 +1319,6 @@ def signing_keys(self):
|
|||
|
||||
|
||||
|
||||
@property
|
||||
def compressions(self):
|
||||
"""
|
||||
<Purpose>
|
||||
A getter method that returns a list of the file compression algorithms
|
||||
used when the metadata is written to disk. If ['gz'] is set for the
|
||||
'targets.json' role, the metadata files 'targets.json' and
|
||||
'targets.json.gz' are written.
|
||||
|
||||
>>>
|
||||
>>>
|
||||
>>>
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
||||
<Exceptions>
|
||||
None.
|
||||
|
||||
<Side Effects>
|
||||
None.
|
||||
|
||||
<Returns>
|
||||
A list of compression algorithms, conformant to
|
||||
'tuf.formats.COMPRESSIONS_SCHEMA'.
|
||||
"""
|
||||
|
||||
roleinfo = tuf.roledb.get_roleinfo(self.rolename, self._repository_name)
|
||||
compressions = roleinfo['compressions']
|
||||
|
||||
return compressions
|
||||
|
||||
|
||||
|
||||
@compressions.setter
|
||||
def compressions(self, compression_list):
|
||||
"""
|
||||
<Purpose>
|
||||
A setter method for the file compression algorithms used when the
|
||||
metadata is written to disk. If ['gz'] is set for the 'targets.json' role
|
||||
the metadata files 'targets.json' and 'targets.json.gz' are written.
|
||||
|
||||
>>>
|
||||
>>>
|
||||
>>>
|
||||
|
||||
<Arguments>
|
||||
compression_list:
|
||||
A list of file compression algorithms, conformant to
|
||||
'tuf.formats.COMPRESSIONS_SCHEMA'.
|
||||
|
||||
<Exceptions>
|
||||
securesystemslib.exceptions.FormatError, if 'compression_list' is
|
||||
improperly formatted.
|
||||
|
||||
<Side Effects>
|
||||
Updates the role's compression algorithms listed in 'tuf.roledb.py'.
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
"""
|
||||
|
||||
# Does 'compression_name' have the correct format?
|
||||
# Ensure the arguments have the appropriate number of objects and object
|
||||
# types, and that all dict keys are properly named. Raise
|
||||
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
|
||||
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_list)
|
||||
|
||||
roleinfo = tuf.roledb.get_roleinfo(self.rolename, self._repository_name)
|
||||
|
||||
# Add the compression algorithms of 'compression_list' to the role's
|
||||
# entry in 'tuf.roledb.py'.
|
||||
for compression in compression_list:
|
||||
if compression not in roleinfo['compressions']:
|
||||
roleinfo['compressions'].append(compression)
|
||||
|
||||
tuf.roledb.update_roleinfo(self.rolename, roleinfo,
|
||||
repository_name=self._repository_name)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class Root(Metadata):
|
||||
|
|
@ -1460,8 +1373,7 @@ def __init__(self, repository_name):
|
|||
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0, 'consistent_snapshot': False,
|
||||
'compressions': [''], 'expires': expiration,
|
||||
'partial_loaded': False}
|
||||
'expires': expiration, 'partial_loaded': False}
|
||||
try:
|
||||
tuf.roledb.add_role(self._rolename, roleinfo, self._repository_name)
|
||||
|
||||
|
|
@ -1528,8 +1440,8 @@ def __init__(self, repository_name):
|
|||
expiration = expiration.isoformat() + 'Z'
|
||||
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0, 'compressions': [''],
|
||||
'expires': expiration, 'partial_loaded': False}
|
||||
'signatures': [], 'version': 0, 'expires': expiration,
|
||||
'partial_loaded': False}
|
||||
|
||||
try:
|
||||
tuf.roledb.add_role(self.rolename, roleinfo, self._repository_name)
|
||||
|
|
@ -1591,8 +1503,8 @@ def __init__(self, repository_name):
|
|||
expiration = expiration.isoformat() + 'Z'
|
||||
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0, 'compressions': [''],
|
||||
'expires': expiration, 'partial_loaded': False}
|
||||
'signatures': [], 'version': 0, 'expires': expiration,
|
||||
'partial_loaded': False}
|
||||
|
||||
try:
|
||||
tuf.roledb.add_role(self._rolename, roleinfo, self._repository_name)
|
||||
|
|
@ -1696,7 +1608,7 @@ def __init__(self, targets_directory, rolename='targets', roleinfo=None,
|
|||
# If 'roleinfo' is not provided, set an initial default.
|
||||
if roleinfo is None:
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'version': 0, 'compressions': [''], 'expires': expiration,
|
||||
'version': 0, 'expires': expiration,
|
||||
'signatures': [], 'paths': {}, 'path_hash_prefixes': [],
|
||||
'partial_loaded': False, 'delegations': {'keys': {},
|
||||
'roles': []}}
|
||||
|
|
@ -2371,7 +2283,7 @@ def delegate(self, rolename, public_keys, list_of_targets, threshold=1,
|
|||
expiration = expiration.isoformat() + 'Z'
|
||||
|
||||
roleinfo = {'name': rolename, 'keyids': keyids, 'signing_keyids': [],
|
||||
'threshold': threshold, 'version': 0, 'compressions': [''],
|
||||
'threshold': threshold, 'version': 0,
|
||||
'expires': expiration, 'signatures': [], 'partial_loaded': False,
|
||||
'paths': relative_targetpaths, 'delegations': {'keys': {},
|
||||
'roles': []}}
|
||||
|
|
@ -3096,7 +3008,6 @@ def load_repository(repository_directory, repository_name='default'):
|
|||
'signing_keyids': [],
|
||||
'signatures': [],
|
||||
'partial_loaded': False,
|
||||
'compressions': [],
|
||||
'paths': {},
|
||||
}
|
||||
|
||||
|
|
@ -3108,12 +3019,6 @@ def load_repository(repository_directory, repository_name='default'):
|
|||
roleinfo['paths'].update({filepath: fileinfo.get('custom', {})})
|
||||
roleinfo['delegations'] = metadata_object['delegations']
|
||||
|
||||
if os.path.exists(metadata_path + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
else:
|
||||
logger.debug('A compressed version does not exist.')
|
||||
|
||||
tuf.roledb.add_role(metadata_name, roleinfo, repository_name)
|
||||
loaded_metadata.append(metadata_name)
|
||||
|
||||
|
|
|
|||
|
|
@ -134,7 +134,6 @@ def create_roledb_from_root_metadata(root_metadata, repository_name='default'):
|
|||
|
||||
roleinfo['signatures'] = []
|
||||
roleinfo['signing_keyids'] = []
|
||||
roleinfo['compressions'] = ['']
|
||||
roleinfo['partial_loaded'] = False
|
||||
|
||||
if rolename.startswith('targets'):
|
||||
|
|
|
|||
Loading…
Reference in a new issue