Merge branch 'develop' of github.com:theupdateframework/tuf into 1.0-review2

This commit is contained in:
Vladimir Diaz 2016-01-27 17:11:50 -05:00
commit c2d1a0e9d6
63 changed files with 962 additions and 670 deletions

View file

@ -11,7 +11,6 @@ include tests/repository_data/keystore/targets_key
include tests/repository_data/keystore/timestamp_key
include tuf/_vendor/ed25519/test_data/ed25519
include tuf/_vendor/ed25519/LICENSE
include tuf/_vendor/iso8601/LICENSE
recursive-include docs *.txt
recursive-include docs/papers *.pdf

View file

@ -80,12 +80,12 @@
setup(
name = 'tuf',
version = '0.9.9',
version = '0.10.0',
description = 'A secure updater framework for Python',
long_description = long_description,
author = 'http://www.theupdateframework.com',
author = 'https://www.updateframework.com',
author_email = 'theupdateframework@googlegroups.com',
url = 'http://www.theupdateframework.com',
url = 'https://www.updateframework.com',
keywords = 'update updater secure authentication key compromise revocation',
classifiers = [
'Development Status :: 4 - Beta',
@ -101,9 +101,9 @@
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.2',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: Implementation :: CPython',
'Topic :: Security',
'Topic :: Software Development'

View file

@ -1,15 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICbDCCAdWgAwIBAgIJANAI4zEreOenMA0GCSqGSIb3DQEBBQUAME8xCzAJBgNV
BAYTAlVTMQswCQYDVQQIDAJOWTERMA8GA1UEBwwIQnJvb2tseW4xDDAKBgNVBAoM
A05ZVTESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE0MDYxNzEyMzUxOFoXDTE1MDYx
NzEyMzUxOFowTzELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5ZMREwDwYDVQQHDAhC
cm9va2x5bjEMMAoGA1UECgwDTllVMRIwEAYDVQQDDAlsb2NhbGhvc3QwgZ8wDQYJ
KoZIhvcNAQEBBQADgY0AMIGJAoGBAOMQNpdLIX2z0xcnQmroU5qnwaOPdP4Wy+Kz
QSleHj8Ny/iC/24uZ/wu8Dt0Zru/yUOPSnzA2BWfie9jYK4bmRdChYm7fI+WZekj
JZtmrdQpCexYxNqxHuDNL+OoNmGVspRwsBWyyInoxhPfd8y37nVRE5O/+CeFpk9k
TDTeKbs9AgMBAAGjUDBOMB0GA1UdDgQWBBQeAkYt0Yip/L9+SXYpOFpL2ZwOuzAf
BgNVHSMEGDAWgBQeAkYt0Yip/L9+SXYpOFpL2ZwOuzAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBBQUAA4GBAOFgy2N+ZFlR/tGMd7HHXS2vmNNa52ItWK+96YrbroKO
Izx91LG/QKEyeBXlAGTGGILK4s3v7sJd0Mmg10XwhMqLLUwpOZ4kLzo3GNxhYr5J
UDA+M/0OdPjqWZ7R6B7pM2kubAva17CvGomho34Is64kq5cBTzs90FdmJovOK40K
-----END CERTIFICATE-----

View file

@ -1,31 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAOMQNpdLIX2z0xcn
QmroU5qnwaOPdP4Wy+KzQSleHj8Ny/iC/24uZ/wu8Dt0Zru/yUOPSnzA2BWfie9j
YK4bmRdChYm7fI+WZekjJZtmrdQpCexYxNqxHuDNL+OoNmGVspRwsBWyyInoxhPf
d8y37nVRE5O/+CeFpk9kTDTeKbs9AgMBAAECgYA7G2lakPAy7LCygkHD2p6Iz8qU
bS+jRQPmC4uW3S06dLj4BAHCCMqA1ycqEu77SL13nMFjIEAfwNwDOPdd20lKSygK
vNFKw6wWYzvB9GHFPUKUOAelKrwsaXmYfROOn51A03Uf+dLjEtY5tsxXn47neq+G
iB2WLSk0Vl6YYI9U8QJBAPnyNRWydbz2CTW+nf/jxthcGXT2EBqpZ3HkutCa6YMb
YIrQ5CbSzpdGYY1iNI1i2r7BLwN7qUeMTxvJ4raP9jcCQQDokB75SBixx1ulV4vG
aSZcBrJCC+/yVaym5JwqzJKdi/jDoRPpMYBo1BTNwwacXzNtbCATKIvEQ2URqeUq
L6ArAkAeFlLfjsDvgypupsh8Mh4Qk12ZH7mmi/fg1OjMDanIV3ZSn3ynU778pMM/
cq/iySCNz9Fp+OvSqggnzzCUS1YXAkEApDFMjPcn6Cw2OhALMTP/zy0zIYpICDIQ
yWvSDi2MvgqawZOx+Qvn+xrw7SzqN/DG4FRceOpBc3mZm9T1ZMlnLQJBANoVg0kj
EkaLBX8+73Cl+ERoD6Z/ylCB8twWiedvCbORPf9RTZjQbB2fQ6ay9xS3DMFfnU6R
LJYfLnt2X1E6++c=
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICbDCCAdWgAwIBAgIJANAI4zEreOenMA0GCSqGSIb3DQEBBQUAME8xCzAJBgNV
BAYTAlVTMQswCQYDVQQIDAJOWTERMA8GA1UEBwwIQnJvb2tseW4xDDAKBgNVBAoM
A05ZVTESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE0MDYxNzEyMzUxOFoXDTE1MDYx
NzEyMzUxOFowTzELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5ZMREwDwYDVQQHDAhC
cm9va2x5bjEMMAoGA1UECgwDTllVMRIwEAYDVQQDDAlsb2NhbGhvc3QwgZ8wDQYJ
KoZIhvcNAQEBBQADgY0AMIGJAoGBAOMQNpdLIX2z0xcnQmroU5qnwaOPdP4Wy+Kz
QSleHj8Ny/iC/24uZ/wu8Dt0Zru/yUOPSnzA2BWfie9jYK4bmRdChYm7fI+WZekj
JZtmrdQpCexYxNqxHuDNL+OoNmGVspRwsBWyyInoxhPfd8y37nVRE5O/+CeFpk9k
TDTeKbs9AgMBAAGjUDBOMB0GA1UdDgQWBBQeAkYt0Yip/L9+SXYpOFpL2ZwOuzAf
BgNVHSMEGDAWgBQeAkYt0Yip/L9+SXYpOFpL2ZwOuzAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBBQUAA4GBAOFgy2N+ZFlR/tGMd7HHXS2vmNNa52ItWK+96YrbroKO
Izx91LG/QKEyeBXlAGTGGILK4s3v7sJd0Mmg10XwhMqLLUwpOZ4kLzo3GNxhYr5J
UDA+M/0OdPjqWZ7R6B7pM2kubAva17CvGomho34Is64kq5cBTzs90FdmJovOK40K
-----END CERTIFICATE-----

View file

@ -60,7 +60,8 @@ def _generate_random_port():
httpd = six.moves.BaseHTTPServer.HTTPServer(('localhost', PORT),
six.moves.SimpleHTTPServer.SimpleHTTPRequestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket, certfile='https_server.pem',
httpd.socket = ssl.wrap_socket(httpd.socket, keyfile='ssl_cert.key',
certfile='ssl_cert.crt',
server_side=True)
#print('Starting https server on port: ' + str(PORT))

28
tests/ssl_cert.crt Normal file
View file

@ -0,0 +1,28 @@
-----BEGIN CERTIFICATE-----
MIIE1TCCAz2gAwIBAgIJAKqz8ew7Z44mMA0GCSqGSIb3DQEBCwUAMIGAMQswCQYD
VQQGEwJVUzERMA8GA1UECAwITmV3IFlvcmsxETAPBgNVBAcMCEJyb29rbHluMQww
CgYDVQQKDANOWVUxKTAnBgNVBAsMIENvbXB1dGVyIFNjaWVuY2UgYW5kIEVuZ2lu
ZWVyaW5nMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTYwMTI3MjEyMTMxWhcNMjYw
MTI0MjEyMTMxWjCBgDELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMREw
DwYDVQQHDAhCcm9va2x5bjEMMAoGA1UECgwDTllVMSkwJwYDVQQLDCBDb21wdXRl
ciBTY2llbmNlIGFuZCBFbmdpbmVlcmluZzESMBAGA1UEAwwJbG9jYWxob3N0MIIB
ojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEAxyFVeRsWnb1UlCKBks2azM9W
9K+J/ZkzdSb6eCxOIxv79M/Ug54CfWqkySSaQejsu0U/gJxkFYRvwQAy5lATrspY
2kyiWYiggWXFDWz+i8ETPkL9zn59v13sNIpT/IXQj0S3Mr9ZnsUn1qCyEOOIxJxZ
lyuV/M/XP1DP4tArhEvrex12V6MQIK+8fYzEjHG/W7vIIet+wTStIR8ArvVQi0Kv
PbbGCfrZ+e+gq+UpBLBuAfMzM95TW+YJ5duMchie2n6LDmOeegA4jMEv2ppeOr8Q
JJtZuKpXWVbJvLg81yrDjr1rAwJR/WQrnk8GQWPCyPLneAA4mJbi75LqjLxn0AoJ
b3kzLfGEMJJEWXspxNg06bLQU948hB4L7nKARq6s7KoESjEV+/L4koMPWJoNq6fx
OUVw2+S3ITNrDctecRQ1j3RGVPaj5l6bn03C7KV9uRrfqFY3OUjn7A0kDczvRnmr
e1BZIpe+mfGFB+Uu7JiQoBv6I6fqyrdH9rX1LUKlAgMBAAGjUDBOMB0GA1UdDgQW
BBT8LvRkvodP9bR/bBs/aI+AydRIvTAfBgNVHSMEGDAWgBT8LvRkvodP9bR/bBs/
aI+AydRIvTAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBgQC6kwuSEF0Y
5yLMf1TKfVkeBaZ4tOqR2kzpggzPPog+JcfIQgVmI2QTUDritHWFIM4YUwQ/00WU
uol2BCUpgaLci5gNgyTw8p538Q5cZHXE3kK/CWJA4zKag+oHdmXzGjMalqzvPuVJ
9VdtPrwHhB0Xntf72iWWhE2dIn1QZqVmJ/8hhIU8cQ91pIqTjYjhrYE/GhGH7HMW
bRiRolt37VxbzfXjEBMqVH6fOQq0piTRxwTNPBFp6JO5mRakRmWRvN3dnR8J9qXi
6tQhNNn2uQIpPlKlqVQnh5j5YxFrb50b0FCjDw+eNilXP93yjV4+lWK2QZychcGl
6/7Wu8snZkJCImPbwmcT80XSKesf918zIkauekWiaJE02+ljNtbM7MUAE+XLsKJy
NFGzpyZJ9LihGC/eeVl7K+xqC41jGVOXOOHtbDMbIQfaEZd1nPvy3+V/tublv+am
jPSlj/FW3bLTkjF0OspFjHvJeCeAJdM9kJdYfZoahd6kcejGJc+vjXE=
-----END CERTIFICATE-----

39
tests/ssl_cert.key Normal file
View file

@ -0,0 +1,39 @@
-----BEGIN RSA PRIVATE KEY-----
MIIG4wIBAAKCAYEAxyFVeRsWnb1UlCKBks2azM9W9K+J/ZkzdSb6eCxOIxv79M/U
g54CfWqkySSaQejsu0U/gJxkFYRvwQAy5lATrspY2kyiWYiggWXFDWz+i8ETPkL9
zn59v13sNIpT/IXQj0S3Mr9ZnsUn1qCyEOOIxJxZlyuV/M/XP1DP4tArhEvrex12
V6MQIK+8fYzEjHG/W7vIIet+wTStIR8ArvVQi0KvPbbGCfrZ+e+gq+UpBLBuAfMz
M95TW+YJ5duMchie2n6LDmOeegA4jMEv2ppeOr8QJJtZuKpXWVbJvLg81yrDjr1r
AwJR/WQrnk8GQWPCyPLneAA4mJbi75LqjLxn0AoJb3kzLfGEMJJEWXspxNg06bLQ
U948hB4L7nKARq6s7KoESjEV+/L4koMPWJoNq6fxOUVw2+S3ITNrDctecRQ1j3RG
VPaj5l6bn03C7KV9uRrfqFY3OUjn7A0kDczvRnmre1BZIpe+mfGFB+Uu7JiQoBv6
I6fqyrdH9rX1LUKlAgMBAAECggGAEogMn0ehFC7xdxO7AUF3HYZSLlVDv0EJo+Zr
utFMuEG7ce4Bdfo3exp4mWt5m5akqUzpevuS6Nm5WLm/AuYC3upf2Hj3RuPLJB+n
dfdlvPXL56huXFAzPaLs/3q8FC0T2rFnZyadnYP1kCjGSYITUVDHmaTpwWxKOM85
eX8r/ZTfJkb4o3E+Z/xSy1BVXkibqVrRZi63Th2r2wA6nQ2hYERlcJXY2kbpEDR3
vGeIKLKOmknawwH2uf+vfh+vc1LNE7p9C5w16ex0OcmCo6G1ln7/dcwmXmcS3M0S
Bax5Jzu5ozaJFL9G59o0AUGJoZj9Gj9leeKPZvShsGcA0JmBMQiLIdhgRwj0B83x
HrYXTZ6P5BjJmwrIv4mGdv2bHV20pbWKAATUwo8EVBzylipexhhAtQJ5B6OsPDPS
HTluaEC2niD6lE613uRnzzbjw4SlwkoMLE0aqOhQyWIPS9/8oRjTzQi4otL7Dt69
oMrVhmSfxUqZhh2R3KMHDcMKt5nBAoHBAOXkDovYOhTMD3ei0WbKpbSB1sJp5t2d
/9gVil4nWLa4ahw7/TsZi3Co+c9gD2UJku1L9JbOy6TVZ2LoXOybLvIJfeAjNdYH
vi/ElG7498fgsSyw6bua/1VEd7VtbtpWJIQt1LdJG1+O3ZbJNTY6tbLbYVuy4FIO
e/484F8kdZ9PtRsn+I0I7kfoYJ2IFoM0UWgwQETOBguBCua43ZnHoxrvyHKABAO+
Iuvw4RBZKphGVxMCEjvTCB9S/CpGCRAkkQKBwQDdvu3reA/lVdFDN56VNUn0u3vr
zPSoiOjojlHDyWVAWiLB9I0qaE61UMvVgChM8VkmjhHYQEW6Cj0XMZMkCnsfKDQn
TYF16jt/sTteWSTcx0PTeiCGs3yM5wK4B8q9coOlzSqDd39mjDIFiUz4e+44OIcU
+ISc8pGbwxw0W8qRwIUJPTSVoaUZDnupuR/IE48q8CTPT1Gf00sMLWuv3SYuFHKX
djpcMLWVf4HclIY6y3BqNIZ0JaUAOd+OZT2kdtUCgcBLWPwLics/lcJcC9lmP3Ug
PI4PGna4nFiGkkjPo0XIXZkpt9+/xxeUzU1TUsC49PJbJFH+O7kzRV6lZFNQmWxB
mCrRk7jJdbA4J84esStFL7fiVfnFq3+UiuRRapSyqxk82WimyidWopSuHzR5mbSD
8rNuQqqTOnwZUAqaJHEIzi8lv2wPjaXLm7ZO65O1XShxZZ8q7fu9OYZBKMY46N3k
rkKchKjMMT1w53pcyVzUm/leGYewY/J9kc1kbZ/60oECgcEAj/qdzwt4/sa3BncB
wA4GxCJL9zJwFVI4MG/gRUjqNluQP/GDC2sI2A/rGeiJwlPfN/p9ObWZ0I8/VWT6
DifEA9n96xsXGTIKigHQ85TcK4Iy1whwQCYgk/iXOljM2i+VrT1HAm+/yBz1icS5
ton5hoWlqAcpTCLwSnvoP1Lud67ScspL73Aym89cmjo6mZWhmxasP/NXo3f1PaXs
SxdD6B2cvh2lDSEPdk+BSXEiquBXUI5kUtvyg/AP6Qxxdu01AoHAO05qTh9zokkT
yg0sZf4Z5i01em2ys4ZhQjhhbw+I5lIO76e/ZyUWpEZusBVd9TV5BHgiATOHw4yr
nbjEZKwLEb3SXoHl3/CD/l9vWk4gKAYDJdW+oPZttDlkp6dfPJVDupQwLhrxXYmE
fgs4WFmY3Q5b1wut2pnSs1UEPDqJBvykt59gFgn7yVwyTy8VLihNVtH4mwVPYXha
jz2T6BzRAPlYqx/FpkK2YHHNcyj+HFtnBUMMzacnSl/aXpJgHTKw
-----END RSA PRIVATE KEY-----

View file

@ -229,7 +229,7 @@ def test_https_connection(self):
https_url = 'https://localhost:' + str(port) + '/' + relative_target_filepath
# Download the target file using an https connection.
tuf.conf.ssl_certificates = 'https_client.pem'
tuf.conf.ssl_certificates = 'ssl_cert.crt'
message = 'Downloading target file from https server: ' + https_url
logger.info(message)
try:

View file

@ -282,7 +282,7 @@ def test_with_tuf(self):
except tuf.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
self.assertTrue(isinstance(mirror_error, tuf.InvalidMetadataJSONError))
self.assertTrue(isinstance(mirror_error, tuf.Error))
else:
self.fail('TUF did not prevent an endless data attack.')

View file

@ -214,8 +214,8 @@ def test_with_tuf(self):
try:
self.repository_updater.targets_of_role('targets/role1')
# Verify that the specific 'tuf.BadHashError' exception is raised by each
# mirror.
# Verify that the specific 'tuf.ForbiddenTargetError' exception is raised
# by each mirror.
except tuf.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
@ -223,7 +223,7 @@ def test_with_tuf(self):
# Verify that 'role1.json' is the culprit.
self.assertEqual(url_file, mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.BadHashError))
self.assertTrue(isinstance(mirror_error, tuf.ForbiddenTargetError))
else:
self.fail('TUF did not prevent an extraneous dependencies attack.')

View file

@ -197,6 +197,7 @@ def test_schemas(self):
{'_type': 'Root',
'version': 8,
'consistent_snapshot': False,
'compression_algorithms': ['gz'],
'expires': '1985-10-21T13:20:00Z',
'keys': {'123abc': {'keytype': 'rsa',
'keyval': {'public': 'pubkey',
@ -223,17 +224,13 @@ def test_schemas(self):
{'_type': 'Snapshot',
'version': 8,
'expires': '1985-10-21T13:20:00Z',
'meta': {'metadata/snapshot.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}}),
'meta': {'metadata/snapshot.json': {'version': 1024}}}),
'TIMESTAMP_SCHEMA': (tuf.formats.TIMESTAMP_SCHEMA,
{'_type': 'Timestamp',
'version': 8,
'expires': '1985-10-21T13:20:00Z',
'meta': {'metadata/timestamp.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}}),
'meta': {'metadata/timestamp.json': {'version': 1024}}}),
'MIRROR_SCHEMA': (tuf.formats.MIRROR_SCHEMA,
{'url_prefix': 'http://localhost:8001',
@ -303,29 +300,27 @@ def test_TimestampFile(self):
# Test conditions for valid instances of 'tuf.formats.TimestampFile'.
version = 8
expires = '1985-10-21T13:20:00Z'
filedict = {'metadata/timestamp.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}
versiondict = {'targets.json': {'version': version}}
make_metadata = tuf.formats.TimestampFile.make_metadata
from_metadata = tuf.formats.TimestampFile.from_metadata
TIMESTAMP_SCHEMA = tuf.formats.TIMESTAMP_SCHEMA
self.assertTrue(TIMESTAMP_SCHEMA.matches(make_metadata(version, expires,
filedict)))
metadata = make_metadata(version, expires, filedict)
versiondict)))
metadata = make_metadata(version, expires, versiondict)
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TimestampFile))
# Test conditions for invalid arguments.
bad_version = 'eight'
bad_expires = '2000'
bad_filedict = 123
bad_versiondict = 123
self.assertRaises(tuf.FormatError, make_metadata, bad_version,
expires, filedict)
expires, versiondict)
self.assertRaises(tuf.FormatError, make_metadata, version,
bad_expires, filedict)
bad_expires, versiondict)
self.assertRaises(tuf.FormatError, make_metadata, version,
expires, bad_filedict)
expires, bad_versiondict)
self.assertRaises(tuf.FormatError, from_metadata, 123)
@ -345,6 +340,8 @@ def test_RootFile(self):
roledict = {'root': {'keyids': ['123abc'],
'threshold': 1,
'paths': ['path1/', 'path2']}}
compression_algorithms = ['gz']
make_metadata = tuf.formats.RootFile.make_metadata
from_metadata = tuf.formats.RootFile.from_metadata
@ -352,9 +349,10 @@ def test_RootFile(self):
self.assertTrue(ROOT_SCHEMA.matches(make_metadata(version, expires,
keydict, roledict,
consistent_snapshot)))
consistent_snapshot,
compression_algorithms)))
metadata = make_metadata(version, expires, keydict, roledict,
consistent_snapshot)
consistent_snapshot, compression_algorithms)
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.RootFile))
# Test conditions for invalid arguments.
@ -362,23 +360,28 @@ def test_RootFile(self):
bad_expires = 'eight'
bad_keydict = 123
bad_roledict = 123
bad_compression_algorithms = ['nozip']
self.assertRaises(tuf.FormatError, make_metadata, bad_version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, make_metadata, version,
bad_expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, make_metadata, version,
expires,
bad_keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, make_metadata, version,
expires,
keydict, bad_roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.FormatError, from_metadata, 'bad')
@ -388,30 +391,27 @@ def test_SnapshotFile(self):
# Test conditions for valid instances of 'tuf.formats.SnapshotFile'.
version = 8
expires = '1985-10-21T13:20:00Z'
filedict = {'metadata/snapshot.json': {'length': 1024,
'hashes': {'sha256': 'ABCD123'},
'custom': {'type': 'metadata'}}}
versiondict = {'targets.json' : {'version': version}}
make_metadata = tuf.formats.SnapshotFile.make_metadata
from_metadata = tuf.formats.SnapshotFile.from_metadata
SNAPSHOT_SCHEMA = tuf.formats.SNAPSHOT_SCHEMA
self.assertTrue(SNAPSHOT_SCHEMA.matches(make_metadata(version, expires,
filedict)))
metadata = make_metadata(version, expires, filedict)
versiondict)))
metadata = make_metadata(version, expires, versiondict)
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.SnapshotFile))
# Test conditions for invalid arguments.
bad_version = '8'
bad_expires = '2000'
bad_filedict = 123
bad_versiondict = 123
self.assertRaises(tuf.FormatError, make_metadata, version,
expires, bad_filedict)
expires, bad_versiondict)
self.assertRaises(tuf.FormatError, make_metadata, bad_version, expires,
filedict)
versiondict)
self.assertRaises(tuf.FormatError, make_metadata, version, bad_expires,
bad_filedict)
bad_versiondict)
self.assertRaises(tuf.FormatError, from_metadata, 123)
@ -548,6 +548,7 @@ def test_make_signable(self):
root = {'_type': 'Root',
'version': 8,
'consistent_snapshot': False,
'compression_algorithms': ['gz'],
'expires': '1985-10-21T13:20:00Z',
'keys': {'123abc': {'keytype': 'rsa',
'keyval': {'public': 'pubkey',
@ -679,6 +680,7 @@ def test_check_signable_object_format(self):
root = {'_type': 'Root',
'version': 8,
'consistent_snapshot': False,
'compression_algorithms': ['gz'],
'expires': '1985-10-21T13:20:00Z',
'keys': {'123abc': {'keytype': 'rsa',
'keyval': {'public': 'pubkey',

View file

@ -190,11 +190,13 @@ def test_create_keydb_from_root_metadata(self):
version = 8
consistent_snapshot = False
expires = '1985-10-21T01:21:00Z'
compression_algorithms = ['gz']
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata))
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
@ -230,11 +232,13 @@ def test_create_keydb_from_root_metadata(self):
keydict[keyid3] = rsakey3
version = 8
expires = '1985-10-21T01:21:00Z'
compression_algorithms = ['gz']
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata))
# Ensure only 'keyid2' was added to the keydb database. 'keyid' and

View file

@ -239,16 +239,16 @@ def test_with_tuf(self):
try:
self.repository_updater.targets_of_role('targets/role1')
# Verify that the specific 'tuf.BadHashError' exception is raised by each
# mirror.
# Verify that the specific 'tuf.BadVersionNumberError' exception is raised
# by each mirror.
except tuf.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'metadata', 'targets', 'role1.json')
# Verify that 'timestamp.json' is the culprit.
# Verify that 'role1.json' is the culprit.
self.assertEqual(url_file, mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.BadHashError))
self.assertTrue(isinstance(mirror_error, tuf.BadVersionNumberError))
else:
self.fail('TUF did not prevent a mix-and-match attack.')

View file

@ -289,7 +289,7 @@ def test_with_tuf(self):
# version.
repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 12, 12)
repository.write()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
@ -315,6 +315,7 @@ def test_with_tuf(self):
# Restore the previous version of 'timestamp.json' on the remote repository
# and verify that the non-TUF client downloads it (expected, but not ideal).
shutil.move(backup_timestamp, timestamp_path)
logger.info('Moving the timestamp.json backup to the current version.')
# Verify that the TUF client detects replayed metadata and refuses to
# continue the update process.

View file

@ -49,6 +49,7 @@
import tuf.keydb
import tuf.hash
import tuf.repository_lib as repo_lib
import tuf.repository_tool as repo_tool
import six
@ -429,7 +430,7 @@ def test_generate_root_metadata(self):
expires = '1985-10-21T01:22:00Z'
root_metadata = repo_lib.generate_root_metadata(1, expires,
consistent_snapshot=False)
consistent_snapshot=False)
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata))
@ -547,7 +548,6 @@ def test_generate_targets_metadata(self):
def test_generate_snapshot_metadata(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
@ -557,17 +557,27 @@ def test_generate_snapshot_metadata(self):
shutil.copytree(original_repository_path, repository_directory)
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME)
root_filename = os.path.join(metadata_directory, repo_lib.ROOT_FILENAME)
targets_filename = os.path.join(metadata_directory,
repo_lib.TARGETS_FILENAME)
version = 1
expiration_date = '1985-10-21T13:20:00Z'
# Load a valid repository so that top-level roles exist in roledb and
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
repository = repo_tool.Repository(repository_directory, metadata_directory,
targets_directory)
repository_junk = repo_tool.load_repository(repository_directory)
root_filename = 'root'
targets_filename = 'targets'
snapshot_metadata = \
repo_lib.generate_snapshot_metadata(metadata_directory, version,
expiration_date, root_filename,
targets_filename,
consistent_snapshot=False)
expiration_date, root_filename,
targets_filename,
consistent_snapshot=False)
self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata))
@ -609,25 +619,20 @@ def test_generate_timestamp_metadata(self):
version = 1
expiration_date = '1985-10-21T13:20:00Z'
compressions = ['gz']
snapshot_metadata = \
repo_lib.generate_timestamp_metadata(snapshot_filename, version,
expiration_date, compressions)
expiration_date)
self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(snapshot_metadata))
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
3, version, expiration_date, compressions)
3, version, expiration_date)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, '3', expiration_date, compressions)
snapshot_filename, '3', expiration_date)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, version, '3', compressions)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, version, expiration_date, 3)
self.assertRaises(tuf.FormatError, repo_lib.generate_timestamp_metadata,
snapshot_filename, version, expiration_date, ['compress'])
snapshot_filename, version, '3')
@ -693,24 +698,31 @@ def test_write_metadata_file(self):
root_signable = tuf.util.load_json_file(root_filename)
output_filename = os.path.join(temporary_directory, 'root.json')
compressions = ['gz']
compression_algorithms = ['gz']
version_number = root_signable['signed']['version'] + 1
self.assertFalse(os.path.exists(output_filename))
repo_lib.write_metadata_file(root_signable, output_filename, compressions,
consistent_snapshot=False)
repo_lib.write_metadata_file(root_signable, output_filename,
version_number,
compression_algorithms,
consistent_snapshot=False)
self.assertTrue(os.path.exists(output_filename))
self.assertTrue(os.path.exists(output_filename + '.gz'))
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file,
3, output_filename, compressions, False)
3, output_filename, version_number,
compression_algorithms, False)
self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file,
root_signable, 3, compressions, False)
root_signable, 3, version_number, compression_algorithms,
False)
self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename, 3, False)
root_signable, output_filename, '3',
compression_algorithms, False)
self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename, compressions, 3)
root_signable, output_filename, version_number,
compression_algorithms, 3)

View file

@ -326,7 +326,7 @@ def test_write_and_write_partial(self):
repository.snapshot.load_signing_key(snapshot_privkey)
# Verify that consistent snapshot can be written and loaded.
repository.write(consistent_snapshot=True)
repository.write(consistent_snapshot=True)
repo_tool.load_repository(repository_directory)
# Test improperly formatted arguments.

View file

@ -329,12 +329,14 @@ def test_create_roledb_from_root_metadata(self):
'targets': {'keyids': [keyid2], 'threshold': 1}}
version = 8
consistent_snapshot = False
expires = '1985-10-21T01:21:00Z'
expires = '1985-10-21T01:21:00Z'
compression_algorithms = ['gz']
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None,
tuf.roledb.create_roledb_from_root_metadata(root_metadata))
# Ensure 'Root' and 'Targets' were added to the role database.
@ -372,7 +374,8 @@ def test_create_roledb_from_root_metadata(self):
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertRaises(tuf.Error,
tuf.roledb.create_roledb_from_root_metadata, root_metadata)
# Remove the invalid role and re-generate 'root_metadata' to test for the
@ -381,7 +384,8 @@ def test_create_roledb_from_root_metadata(self):
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
self.assertEqual(None,
tuf.roledb.create_roledb_from_root_metadata(root_metadata))

View file

@ -349,33 +349,40 @@ def test_1__rebuild_key_and_role_db(self):
def test_1__update_fileinfo(self):
def test_1__update_versioninfo(self):
# Tests
# Verify that the 'self.fileinfo' dictionary is empty (its starts off empty
# and is only populated if _update_fileinfo() is called.
fileinfo_dict = self.repository_updater.fileinfo
self.assertEqual(len(fileinfo_dict), 0)
# Verify that the 'self.versioninfo' dictionary is empty (it starts off
# empty and is only populated if _update_versioninfo() is called.
versioninfo_dict = self.repository_updater.versioninfo
self.assertEqual(len(versioninfo_dict), 0)
# Load the fileinfo of the top-level root role. This populates the
# 'self.fileinfo' dictionary.
self.repository_updater._update_fileinfo('root.json')
self.assertEqual(len(fileinfo_dict), 1)
self.assertTrue(tuf.formats.FILEDICT_SCHEMA.matches(fileinfo_dict))
root_filepath = os.path.join(self.client_metadata_current, 'root.json')
length, hashes = tuf.util.get_file_details(root_filepath)
root_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertTrue('root.json' in fileinfo_dict)
self.assertEqual(fileinfo_dict['root.json'], root_fileinfo)
# Load the versioninfo of the top-level Root role. This action populates
# the 'self.versioninfo' dictionary.
self.repository_updater._update_versioninfo('root.json')
self.assertEqual(len(versioninfo_dict), 1)
self.assertTrue(tuf.formats.VERSIONDICT_SCHEMA.matches(versioninfo_dict))
# The Snapshot role stores the version numbers of all the roles available
# on the repository. Load Snapshot to extract Root's version number
# and compare it against the one loaded by 'self.repository_updater'.
snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
root_versioninfo = snapshot_signable['signed']['meta']['root.json']
# Verify that the manually loaded version number of root.json matches
# the one loaded by the updater object.
self.assertTrue('root.json' in versioninfo_dict)
self.assertEqual(versioninfo_dict['root.json'], root_versioninfo)
# Verify that 'self.fileinfo' is incremented if another role is updated.
self.repository_updater._update_fileinfo('targets.json')
self.assertEqual(len(fileinfo_dict), 2)
# Verify that 'self.versioninfo' is incremented if another role is updated.
self.repository_updater._update_versioninfo('targets.json')
self.assertEqual(len(versioninfo_dict), 2)
# Verify that 'self.fileinfo' is inremented if a non-existent role is
# requested, and has its fileinfo entry set to 'None'.
self.repository_updater._update_fileinfo('bad_role.json')
self.assertEqual(len(fileinfo_dict), 3)
self.assertEqual(fileinfo_dict['bad_role.json'], None)
# Verify that 'self.versioninfo' is incremented if a non-existent role is
# requested, and has its versioninfo entry set to 'None'.
self.repository_updater._update_versioninfo('bad_role.json')
self.assertEqual(len(versioninfo_dict), 3)
self.assertEqual(versioninfo_dict['bad_role.json'], None)
@ -458,24 +465,20 @@ def test_2__import_delegations(self):
def test_2__fileinfo_has_changed(self):
# Verify that the method returns 'False' if file info was not changed.
root_filepath = os.path.join(self.client_metadata_current, 'root.json')
length, hashes = tuf.util.get_file_details(root_filepath)
root_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertFalse(self.repository_updater._fileinfo_has_changed('root.json',
root_fileinfo))
def test_2__versioninfo_has_changed(self):
# Verify that the method returns 'False' if a versioninfo was not changed.
snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
root_versioninfo = snapshot_signable['signed']['meta']['root.json']
self.assertFalse(self.repository_updater._versioninfo_has_changed('root.json',
root_versioninfo))
# Verify that the method returns 'True' if Root's version number changes.
root_versioninfo['version'] = 8
self.assertTrue(self.repository_updater._versioninfo_has_changed('root.json',
root_versioninfo))
# Verify that the method returns 'True' if length or hashes were changed.
new_length = 8
new_root_fileinfo = tuf.formats.make_fileinfo(new_length, hashes)
self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json',
new_root_fileinfo))
# Hashes were changed.
new_hashes = {'sha256': self.random_string()}
new_root_fileinfo = tuf.formats.make_fileinfo(length, new_hashes)
self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json',
new_root_fileinfo))
@ -543,24 +546,23 @@ def test_3__update_metadata(self):
# This is the default metadata that we would create for the timestamp role,
# because it has no signed metadata for itself.
DEFAULT_TIMESTAMP_FILEINFO = {
'hashes': {},
'length': tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
}
# Save the fileinfo of 'targets.json' and 'targets.json.gz', needed later
# when re-installing with _update_metadata().
targets_fileinfo = \
DEFAULT_TIMESTAMP_FILELENGTH = tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
# This is the the upper bound length for Targets metadata.
DEFAULT_TARGETS_FILELENGTH = tuf.conf.DEFAULT_TARGETS_REQUIRED_LENGTH
# Save the versioninfo of 'targets.json,' needed later when re-installing
# with _update_metadata().
targets_versioninfo = \
self.repository_updater.metadata['current']['snapshot']['meta']\
['targets.json']
targets_compressed_fileinfo = \
self.repository_updater.metadata['current']['snapshot']['meta']\
['targets.json.gz']
# Remove the currently installed metadata from the store, and disk. Verify
# Remove the currently installed metadata from the store and disk. Verify
# that the metadata dictionary is re-populated after calling
# _update_metadata().
self.repository_updater.metadata['current'].clear()
del self.repository_updater.metadata['current']['timestamp']
del self.repository_updater.metadata['current']['targets']
timestamp_filepath = \
os.path.join(self.client_metadata_current, 'timestamp.json')
targets_filepath = os.path.join(self.client_metadata_current, 'targets.json')
@ -572,16 +574,20 @@ def test_3__update_metadata(self):
# Verify 'timestamp.json' is properly installed.
self.assertFalse('timestamp' in self.repository_updater.metadata)
self.repository_updater._update_metadata('timestamp',
DEFAULT_TIMESTAMP_FILEINFO)
DEFAULT_TIMESTAMP_FILELENGTH)
self.assertTrue('timestamp' in self.repository_updater.metadata['current'])
os.path.exists(timestamp_filepath)
# Verify 'targets.json' is properly installed.
self.assertFalse('targets' in self.repository_updater.metadata['current'])
self.repository_updater._update_metadata('targets', targets_fileinfo)
self.repository_updater._update_metadata('targets',
DEFAULT_TARGETS_FILELENGTH,
targets_versioninfo['version'])
self.assertTrue('targets' in self.repository_updater.metadata['current'])
length, hashes = tuf.util.get_file_details(targets_filepath)
self.assertEqual(targets_fileinfo, tuf.formats.make_fileinfo(length, hashes))
targets_signable = tuf.util.load_json_file(targets_filepath)
loaded_targets_version = targets_signable['signed']['version']
self.assertEqual(targets_versioninfo['version'], loaded_targets_version)
# Remove the 'targets.json' metadata so that the compressed version may be
# tested next.
@ -591,45 +597,47 @@ def test_3__update_metadata(self):
# Verify 'targets.json.gz' is properly intalled. Note: The uncompressed
# version is installed if the compressed one is downloaded.
self.assertFalse('targets' in self.repository_updater.metadata['current'])
self.repository_updater._update_metadata('targets', targets_fileinfo, 'gzip',
targets_compressed_fileinfo)
self.repository_updater._update_metadata('targets',
DEFAULT_TARGETS_FILELENGTH,
targets_versioninfo['version'],
'gzip')
self.assertTrue('targets' in self.repository_updater.metadata['current'])
length, hashes = tuf.util.get_file_details(targets_filepath)
self.assertEqual(targets_fileinfo, tuf.formats.make_fileinfo(length, hashes))
self.assertEqual(targets_versioninfo['version'],
self.repository_updater.metadata['current']['targets']['version'])
# Test: Invalid fileinfo.
# Invalid fileinfo for the uncompressed version of 'targets.json'.
# Test: Invalid / untrusted version numbers.
# Invalid version number for the uncompressed version of 'targets.json'.
self.assertRaises(tuf.NoWorkingMirrorError,
self.repository_updater._update_metadata,
'targets', targets_compressed_fileinfo)
'targets', DEFAULT_TARGETS_FILELENGTH, 88)
# Verify that the specific exception raised is correct for the previous
# case.
try:
self.repository_updater._update_metadata('targets',
targets_compressed_fileinfo)
DEFAULT_TARGETS_FILELENGTH, 88)
except tuf.NoWorkingMirrorError as e:
for mirror_error in six.itervalues(e.mirror_errors):
assert isinstance(mirror_error, tuf.BadHashError)
assert isinstance(mirror_error, tuf.BadVersionNumberError)
# Invalid fileinfo for the compressed version of 'targets.json'
# Invalid version number for the compressed version of 'targets.json'
self.assertRaises(tuf.NoWorkingMirrorError,
self.repository_updater._update_metadata,
'targets', targets_compressed_fileinfo, 'gzip',
targets_fileinfo)
'targets', DEFAULT_TARGETS_FILELENGTH, 88,
'gzip')
# Verify that the specific exception raised is correct for the previous
# case. The length is checked before the hashes, so the specific error in
# this case should be 'tuf.DownloadLengthMismatchError'.
# case. The version number is checked, so the specific error in
# this case should be 'tuf.BadVersionNumberError'.
try:
self.repository_updater._update_metadata('targets',
targets_compressed_fileinfo,
'gzip', targets_fileinfo)
DEFAULT_TARGETS_FILELENGTH,
88, 'gzip')
except tuf.NoWorkingMirrorError as e:
for mirror_error in six.itervalues(e.mirror_errors):
assert isinstance(mirror_error, tuf.DownloadLengthMismatchError)
assert isinstance(mirror_error, tuf.BadVersionNumberError)
@ -653,7 +661,6 @@ def test_3__update_metadata_if_changed(self):
# Verify the current version of 'targets.json' has not changed.
self.assertEqual(self.repository_updater.metadata['current']['targets']['version'], 1)
# Modify one target file on the remote repository.
repository = repo_tool.load_repository(self.repository_directory)
target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt')
@ -669,16 +676,12 @@ def test_3__update_metadata_if_changed(self):
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Update 'targets.json' and verify that the client's current 'targets.json'
# has been updated. 'timestamp' and 'snapshot' must be manually updated
# so that new 'targets' may be recognized.
DEFAULT_TIMESTAMP_FILEINFO = {
'hashes': {},
'length': tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
}
# so that new 'targets' can be recognized.
DEFAULT_TIMESTAMP_FILELENGTH = tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
self.repository_updater._update_metadata('timestamp', DEFAULT_TIMESTAMP_FILEINFO)
self.repository_updater._update_metadata('timestamp', DEFAULT_TIMESTAMP_FILELENGTH)
self.repository_updater._update_metadata_if_changed('snapshot', 'timestamp')
self.repository_updater._update_metadata_if_changed('targets')
targets_path = os.path.join(self.client_metadata_current, 'targets.json')
@ -1017,68 +1020,66 @@ def test_6_download_target(self):
target_filepaths = \
list(self.repository_updater.metadata['current']['targets']['targets'].keys())
# Test: normal case.
# Get the target info, which is an argument to 'download_target()'.
for target_filepath in target_filepaths:
target_fileinfo = self.repository_updater.target(target_filepath)
self.repository_updater.download_target(target_fileinfo,
destination_directory)
# 'target_filepaths' is expected to have at least two targets. The first
# target will be used to test against download_target(). The second
# will be used to test against download_target() and a repository with
# 'consistent_snapshot' set to True.
target_filepath1 = target_filepaths.pop()
target_fileinfo = self.repository_updater.target(target_filepath1)
self.repository_updater.download_target(target_fileinfo,
destination_directory)
download_filepath = \
os.path.join(destination_directory, target_filepath.lstrip('/'))
self.assertTrue(os.path.exists(download_filepath))
length, hashes = tuf.util.get_file_details(download_filepath)
download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes)
download_filepath = \
os.path.join(destination_directory, target_filepath1.lstrip('/'))
self.assertTrue(os.path.exists(download_filepath))
length, hashes = tuf.util.get_file_details(download_filepath)
download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes)
# Add any 'custom' data from the repository's target fileinfo to the
# 'download_targetfileinfo' object being tested.
if 'custom' in target_fileinfo['fileinfo']:
download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom']
self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo)
# Test when consistent snapshots is set. First, create a valid
# repository with consistent snapshots set (root.json contains a
# "consistent_snapshot" entry that the updater uses to correctly fetch
# snapshots. The updater expects the existence of
# '<version_number>.filename' files if root.json sets 'consistent_snapshot
# = True'.
# The repository must be rewritten with 'consistent_snapshot' set.
repository = repo_tool.load_repository(self.repository_directory)
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# Add any 'custom' data from the repository's target fileinfo to the
# 'download_targetfileinfo' object being tested.
if 'custom' in target_fileinfo['fileinfo']:
download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom']
self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo)
# Test when consistent snapshots is set. First, create a valid
# repository with consistent snapshots set (root.json contains a
# "consistent_snapshots" entry that the updater uses to correctly fetch
# snapshots. The updater expects the existence of <hash>.filename files
# if root.json sets 'consistent_snapshot = True'.
# The repository must be rewritten with consistent snapshots set.
repository = repo_tool.load_repository(self.repository_directory)
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.write(consistent_snapshot=True)
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
self.repository_updater.refresh()
# self.repository_updater.consistent_snapshot = True
self.repository_updater.download_target(target_fileinfo,
destination_directory)
repository.write(consistent_snapshot=True)
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# And ensure the client has the latest top-level metadata.
self.repository_updater.refresh()
target_filepath2 = target_filepaths.pop()
target_fileinfo2 = self.repository_updater.target(target_filepath2)
self.repository_updater.download_target(target_fileinfo2,
destination_directory)
# Test: Invalid arguments.
self.assertRaises(tuf.FormatError, self.repository_updater.download_target,
8, destination_directory)
random_target_filepath = target_filepaths.pop()
target_fileinfo = self.repository_updater.target(random_target_filepath)
self.assertRaises(tuf.FormatError, self.repository_updater.download_target,
target_fileinfo, 8)
# Non-existent destination.
# TODO: test for non-existent directories.
"""
self.assertRaises(tuf.Error, self.repository_updater.download_target,
target_fileinfo, 'non-existent/bad_path')
"""
# Test:
# Attempt a file download of a valid target, however, a download exception
# occurs because the target is not within the mirror's confined target
@ -1098,21 +1099,22 @@ def test_6_download_target(self):
# directories. get_list_of_mirrors() returns an empty list in this case,
# which does not generate specific exception errors.
self.assertEqual(len(exception.mirror_errors), 0)
def test_7_updated_targets(self):
# Verify that list contains all files that need to be updated, these
# files include modified and new target files. Also, confirm that files
# than need not to be updated are absent from the list.
# Verify that the list of targets returned by updated_targets() contains
# all the files that need to be updated, these files include modified and
# new target files. Also, confirm that files that need not to be updated
# are absent from the list.
# Setup
# Create temporary directory which will hold client's target files.
destination_directory = self.make_temp_directory()
# Get the list of target files. It will be used as an argument to
# 'updated_targets' function.
# Get the list of target files. It will be used as an argument to the
# 'updated_targets()' function.
all_targets = self.repository_updater.all_targets()
# Test for duplicates and targets in the root directory of the repository.
@ -1165,9 +1167,18 @@ def test_7_updated_targets(self):
target1 = os.path.join(self.repository_directory, 'targets', 'file1.txt')
repository.targets.remove_target(target1)
length, hashes = tuf.util.get_file_details(target1)
repository.targets.add_target(target1)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
with open(target1, 'a') as file_object:
file_object.write('append extra text')
length, hashes = tuf.util.get_file_details(target1)
repository.targets.add_target(target1)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
@ -1179,7 +1190,7 @@ def test_7_updated_targets(self):
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Ensure the client has the up-to-date metadata.
# Ensure the client has up-to-date metadata.
self.repository_updater.refresh()
# Verify that the new target file is considered updated.

View file

@ -4,7 +4,7 @@
# and then run "tox" from this directory.
[tox]
envlist = py26, py27, py33, py34
envlist = py26, py27, py33, py34, py35
[testenv]
changedir = tests

View file

@ -100,6 +100,13 @@ def __str__(self):
class BadVersionNumberError(Error):
"""Indicate an error for metadata that contains an invalid version number."""
class BadPasswordError(Error):
"""Indicate an error after encountering an invalid password."""
pass

View file

@ -160,12 +160,10 @@ class Updater(object):
self.metadata_directory:
The directory where trusted metadata is stored.
self.fileinfo:
A cache of lengths and hashes of stored metadata files.
self.versioninfo:
A cache of version numbers for the roles available on the repository.
Example: {'root.json': {'length': 13323,
'hashes': {'sha256': dbfac345..}},
...}
Example: {'root.json': {'version': 128}, ...}
self.mirrors:
The repository mirrors from which metadata and targets are available.
@ -299,10 +297,11 @@ def __init__(self, updater_name, repository_mirrors):
# Store the previously trusted/verified metadata.
self.metadata['previous'] = {}
# Store the file information of all the metadata files. The dict keys are
# paths, the dict values fileinfo data. This information can help determine
# whether a metadata file has changed and so needs to be re-downloaded.
self.fileinfo = {}
# Store the version numbers of all roles available on the repository. The
# dict keys are paths, and the dict values versioninfo data. This
# information can help determine whether a metadata file has changed and
# needs to be re-downloaded.
self.versioninfo = {}
# Store the location of the client's metadata directory.
self.metadata_directory = {}
@ -576,8 +575,9 @@ def refresh(self, unsafely_update_root_if_necessary=True):
<Arguments>
unsafely_update_root_if_necessary:
Boolean that indicates whether to unsafely update the Root metadata
if any of the top-level metadata cannot be downloaded successfully.
Boolean that indicates whether to unsafely update the Root metadata if
any of the top-level metadata cannot be downloaded successfully. The
Root role is unsafely updated if its current version number is unknown.
<Exceptions>
tuf.NoWorkingMirrorError:
@ -601,22 +601,17 @@ def refresh(self, unsafely_update_root_if_necessary=True):
# Raise 'tuf.FormatError' if the check fail.
tuf.formats.BOOLEAN_SCHEMA.check_match(unsafely_update_root_if_necessary)
# The timestamp role does not have signed metadata about it; otherwise we
# The Timestamp role does not have signed metadata about it; otherwise we
# would need an infinite regress of metadata. Therefore, we use some
# default, sane metadata about it.
DEFAULT_TIMESTAMP_FILEINFO = {
'hashes': {},
'length': tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
}
# default, but sane, upper file length for its metadata.
DEFAULT_TIMESTAMP_UPPERLENGTH = tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
# The Root role may be updated without knowing its hash if top-level
# metadata cannot be safely downloaded (e.g., keys may have been revoked,
# thus requiring a new Root file that includes the updated keys) and
# 'unsafely_update_root_if_necessary' is True.
DEFAULT_ROOT_FILEINFO = {
'hashes': {},
'length': tuf.conf.DEFAULT_ROOT_REQUIRED_LENGTH
}
# The Root role may be updated without knowing its version number if
# top-level metadata cannot be safely downloaded (e.g., keys may have been
# revoked, thus requiring a new Root file that includes the updated keys)
# and 'unsafely_update_root_if_necessary' is True.
# We use some default, but sane, upper file length for its metadata.
DEFAULT_ROOT_UPPERLENGTH = tuf.conf.DEFAULT_ROOT_REQUIRED_LENGTH
# Update the top-level metadata. The _update_metadata_if_changed() and
# _update_metadata() calls below do NOT perform an update if there
@ -636,12 +631,12 @@ def refresh(self, unsafely_update_root_if_necessary=True):
except tuf.ExpiredMetadataError as e:
# Raise 'tuf.NoWorkingMirrorError' if a valid (not expired, properly
# signed, and valid metadata) 'root' cannot be installed.
# signed, and valid metadata) 'root.json' cannot be installed.
if unsafely_update_root_if_necessary:
message = \
'Expired Root metadata was loaded from disk. Try to update it now.'
logger.info(message)
self._update_metadata('root', DEFAULT_ROOT_FILEINFO)
self._update_metadata('root', DEFAULT_ROOT_UPPERLENGTH)
# The caller explicitly requested not to unsafely fetch an expired Root.
else:
@ -651,7 +646,7 @@ def refresh(self, unsafely_update_root_if_necessary=True):
# Use default but sane information for timestamp metadata, and do not
# require strict checks on its required length.
try:
self._update_metadata('timestamp', DEFAULT_TIMESTAMP_FILEINFO)
self._update_metadata('timestamp', DEFAULT_TIMESTAMP_UPPERLENGTH)
self._update_metadata_if_changed('snapshot',
referenced_metadata='timestamp')
self._update_metadata_if_changed('root')
@ -663,7 +658,7 @@ def refresh(self, unsafely_update_root_if_necessary=True):
'update the Root metadata.'
logger.info(message)
self._update_metadata('root', DEFAULT_ROOT_FILEINFO)
self._update_metadata('root', DEFAULT_ROOT_UPPERLENGTH)
self.refresh(unsafely_update_root_if_necessary=False)
else:
@ -874,7 +869,9 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
metadata_role):
"""
<Purpose>
Non-public method that verifies an uncompressed metadata file.
Non-public method that verifies an uncompressed metadata file. An
exception is raised if 'metadata_file_object is invalid, and there is no
return value.
<Arguments>
metadata_file_object:
@ -906,7 +903,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
In case the metadata file does not have a valid signature.
<Side Effects>
The contents of 'metadata_file_object' is read and loaded.
The content of 'metadata_file_object' is read and loaded.
<Returns>
None.
@ -927,19 +924,10 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
# Is 'metadata_signable' expired?
self._ensure_not_expired(metadata_signable['signed'], metadata_role)
# Is 'metadata_signable' newer than the currently installed
# version?
current_metadata_role = self.metadata['current'].get(metadata_role)
# Compare metadata version numbers. Ensure there is a current
# version of the metadata role to be updated.
if current_metadata_role is not None:
current_version = current_metadata_role['version']
downloaded_version = metadata_signable['signed']['version']
if downloaded_version < current_version:
raise tuf.ReplayedMetadataError(metadata_role, downloaded_version,
current_version)
# We previously verified version numbers in this function, but have since
# moved version number verification to the functions that retrieve
# metadata.
# Reject the metadata if any specified targets are not allowed.
# 'tuf.ForbiddenTargetError' raised if any of the targets of 'metadata_role'
@ -1046,6 +1034,130 @@ def unsafely_verify_compressed_metadata_file(metadata_file_object):
def _get_metadata_file(self, metadata_role, remote_filename,
upperbound_filelength, expected_version,
compression_algorithm):
"""
<Purpose>
Non-public method that tries downloading, up to a certain length, a
metadata file from a list of known mirrors. As soon as the first valid
copy of the file is found, the downloaded file is returned and the
remaining mirrors are skipped.
<Arguments>
metadata_role:
The role name of the metadata (e.g., 'root', 'targets',
'targets/linux/x86').
remote_filename:
The relative file path (on the remove repository) of 'metadata_role'.
upperbound_filelength:
The expected length, or upper bound, of the metadata file to be
downloaded.
expected_version:
The expected and required version number of the 'metadata_role' file
downloaded. 'expected_version' is an integer.
compression_algorithm:
The name of the compression algorithm (e.g., 'gzip'). The algorithm is
needed if the remote metadata file is compressed.
<Exceptions>
tuf.NoWorkingMirrorError:
The metadata could not be fetched. This is raised only when all known
mirrors failed to provide a valid copy of the desired metadata file.
<Side Effects>
The file is downloaded from all known repository mirrors in the worst
case. If a valid copy of the file is found, it is stored in a temporary
file and returned.
<Returns>
A 'tuf.util.TempFile' file-like object containing the metadata.
"""
file_mirrors = tuf.mirrors.get_list_of_mirrors('meta', remote_filename,
self.mirrors)
# file_mirror (URL): error (Exception)
file_mirror_errors = {}
file_object = None
for file_mirror in file_mirrors:
try:
file_object = tuf.download.unsafe_download(file_mirror,
upperbound_filelength)
if compression_algorithm is not None:
logger.info('Decompressing ' + str(file_mirror))
file_object.decompress_temp_file_object(compression_algorithm)
else:
logger.info('Not decompressing ' + str(file_mirror))
# Verify 'file_object' according to the callable function.
# 'file_object' is also verified if decompressed above (i.e., the
# uncompressed version).
metadata_signable = \
tuf.util.load_json_string(file_object.read().decode('utf-8'))
# If the version number is unspecified, ensure that the version number
# downloaded is greater than the currently trusted version number for
# 'metadata_role'.
version_downloaded = metadata_signable['signed']['version']
if expected_version is not None:
# Verify that the downloaded version matches the version expected by
# the caller.
if version_downloaded != expected_version:
message = \
'Downloaded version number: ' + repr(version_downloaded) + '.' \
' Version number MUST be: ' + repr(expected_version)
raise tuf.BadVersionNumberError(message)
# The caller does not know which version to download. Verify that the
# downloaded version is at least greater than the one locally available.
else:
# Verify that the version number of the locally stored
# 'timestamp.json', if available, is less than what was downloaded.
# Otherwise, accept the new timestamp with version number
# 'version_downloaded'.
logger.info('metadata_role: ' + repr(metadata_role))
try:
current_version = \
self.metadata['current'][metadata_role]['version']
if version_downloaded < current_version:
raise tuf.ReplayedMetadataError(metadata_role, version_downloaded,
current_version)
except KeyError:
logger.info(metadata_role + ' not available locally.')
self._verify_uncompressed_metadata_file(file_object, metadata_role)
except Exception as exception:
# Remember the error from this mirror, and "reset" the target file.
logger.exception('Update failed from ' + file_mirror + '.')
file_mirror_errors[file_mirror] = exception
file_object = None
else:
break
if file_object:
return file_object
else:
logger.exception('Failed to update {0} from all mirrors: {1}'.format(
remote_filename, file_mirror_errors))
raise tuf.NoWorkingMirrorError(file_mirror_errors)
def _safely_get_metadata_file(self, metadata_role, metadata_filepath,
uncompressed_fileinfo,
compression=None, compressed_fileinfo=None):
@ -1234,8 +1346,8 @@ def _get_file(self, filepath, verify_file_function, file_type,
def _update_metadata(self, metadata_role, uncompressed_fileinfo,
compression=None, compressed_fileinfo=None):
def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
compression_algorithm=None):
"""
<Purpose>
Non-public method that downloads, verifies, and 'installs' the metadata
@ -1248,31 +1360,21 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
metadata_role:
The name of the metadata. This is a role name and should not end
in '.json'. Examples: 'root', 'targets', 'targets/linux/x86'.
uncompressed_fileinfo:
A dictionary containing length and hashes of the uncompressed metadata
file.
Example:
upperbound_filelength:
The expected length, or upper bound, of the metadata file to be
downloaded.
{"hashes": {"sha256": "3a5a6ec1f353...dedce36e0"},
"length": 1340}
compression:
version:
The expected and required version number of the 'metadata_role' file
downloaded. 'expected_version' is an integer.
compression_algorithm:
A string designating the compression type of 'metadata_role'.
The 'snapshot' metadata file may be optionally downloaded and stored in
compressed form. Currently, only metadata files compressed with 'gzip'
are considered. Any other string is ignored.
compressed_fileinfo:
A dictionary containing length and hashes of the compressed metadata
file.
Example:
{"hashes": {"sha256": "3a5a6ec1f353...dedce36e0"},
"length": 1340}
<Exceptions>
tuf.NoWorkingMirrorError:
The metadata cannot be updated. This is not specific to a single
@ -1294,7 +1396,7 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
# The 'snapshot' or Targets metadata may be compressed. Add the appropriate
# extension to 'metadata_filename'.
if compression == 'gzip':
if compression_algorithm == 'gzip':
metadata_filename = metadata_filename + '.gz'
# Attempt a file download from each mirror until the file is downloaded and
@ -1307,45 +1409,29 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
#
# Some metadata (presently timestamp) will be downloaded "unsafely", in the
# sense that we can only estimate its true length and know nothing about
# its hashes. This is because not all metadata will have other metadata
# its version. This is because not all metadata will have other metadata
# for it; otherwise we will have an infinite regress of metadata signing
# for each other. In this case, we will download the metadata up to the
# best length we can get for it, not check its hashes, but perform the rest
# of the checks (e.g signature verification).
# best length we can get for it, not request a specific version, but
# perform the rest of the checks (e.g., signature verification).
#
# Note also that we presently support decompression of only "safe"
# metadata, but this is easily extend to "unsafe" metadata as well as
# "safe" targets.
if metadata_role == 'timestamp':
metadata_file_object = \
self._unsafely_get_metadata_file(metadata_role, metadata_filename,
uncompressed_fileinfo,
compression, compressed_fileinfo)
elif metadata_role == 'root' and not len(uncompressed_fileinfo['hashes']):
metadata_file_object = \
self._unsafely_get_metadata_file(metadata_role, metadata_filename,
uncompressed_fileinfo,
compression, compressed_fileinfo)
else:
remote_filename = metadata_filename
if self.consistent_snapshot:
if compression:
filename_digest = \
random.choice(list(compressed_fileinfo['hashes'].values()))
else:
filename_digest = \
random.choice(list(uncompressed_fileinfo['hashes'].values()))
dirname, basename = os.path.split(remote_filename)
remote_filename = os.path.join(dirname, filename_digest+'.'+basename)
remote_filename = metadata_filename
filename_version = ''
metadata_file_object = \
self._safely_get_metadata_file(metadata_role, remote_filename,
uncompressed_fileinfo,
compression, compressed_fileinfo)
if self.consistent_snapshot:
filename_version = version
dirname, basename = os.path.split(remote_filename)
remote_filename = os.path.join(dirname, str(filename_version) + '.' + basename)
logger.info('Verifying ' + repr(metadata_role) + ' requesting version: ' + repr(version))
metadata_file_object = \
self._get_metadata_file(metadata_role, remote_filename,
upperbound_filelength, version,
compression_algorithm)
# The metadata has been verified. Move the metadata file into place.
# First, move the 'current' metadata file to the 'previous' directory
@ -1367,8 +1453,9 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
# Next, move the verified updated metadata file to the 'current' directory.
# Note that the 'move' method comes from tuf.util's TempFile class.
# 'metadata_file_object' is an instance of tuf.util.TempFile.
metadata_signable = tuf.util.load_json_string(metadata_file_object.read().decode('utf-8'))
if compression == 'gzip':
metadata_signable = \
tuf.util.load_json_string(metadata_file_object.read().decode('utf-8'))
if compression_algorithm == 'gzip':
current_uncompressed_filepath = \
os.path.join(self.metadata_directory['current'],
uncompressed_metadata_filename)
@ -1389,10 +1476,10 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
# key and role info for the top-level roles if 'metadata_role' is root.
# Rebuilding the the key and role info is required if the newly-installed
# root metadata has revoked keys or updated any top-level role information.
logger.debug('Updated '+repr(current_filepath)+'.')
logger.debug('Updated ' + repr(current_filepath) + '.')
self.metadata['previous'][metadata_role] = current_metadata_object
self.metadata['current'][metadata_role] = updated_metadata_object
self._update_fileinfo(uncompressed_metadata_filename)
self._update_versioninfo(uncompressed_metadata_filename)
# Ensure the role and key information of the top-level roles is also updated
# according to the newly-installed Root metadata.
@ -1468,15 +1555,15 @@ def _update_metadata_if_changed(self, metadata_role,
# Ensure the referenced metadata has been loaded. The 'root' role may be
# updated without having 'snapshot' available.
if referenced_metadata not in self.metadata['current']:
message = 'Cannot update '+repr(metadata_role)+' because ' \
+referenced_metadata+' is missing.'
message = 'Cannot update ' + repr(metadata_role) + ' because ' \
+ referenced_metadata + ' is missing.'
raise tuf.RepositoryError(message)
# The referenced metadata has been loaded. Extract the new
# fileinfo for 'metadata_role' from it.
# versioninfo for 'metadata_role' from it.
else:
message = repr(metadata_role)+' referenced in '+\
repr(referenced_metadata)+'. '+repr(metadata_role)+' may be updated.'
message = repr(metadata_role) + ' referenced in ' +\
repr(referenced_metadata)+ '. ' + repr(metadata_role)+' may be updated.'
logger.debug(message)
# There might be a compressed version of 'snapshot.json' or Targets
@ -1493,47 +1580,58 @@ def _update_metadata_if_changed(self, metadata_role,
# untrusted data is not decompressed prior to verifying hashes, or
# decompressing a file that may be invalid or partially intact.
compression = None
compressed_fileinfo = None
# Extract the fileinfo of the uncompressed version of 'metadata_role'.
uncompressed_fileinfo = self.metadata['current'][referenced_metadata] \
['meta'] \
[uncompressed_metadata_filename]
# Extract the versioninfo of the uncompressed version of 'metadata_role'.
expected_versioninfo = self.metadata['current'][referenced_metadata] \
['meta'] \
[uncompressed_metadata_filename]
# Check for the availability of compressed versions of 'snapshot.json',
# 'targets.json', and delegated Targets (that also start with 'targets').
# For 'targets.json' and delegated metadata, 'referenced_metata'
# should always be 'snapshot'. 'snapshot.json' specifies all roles
# provided by a repository, including their file lengths and hashes.
# provided by a repository, including their version numbers.
if metadata_role == 'snapshot' or metadata_role.startswith('targets'):
gzip_metadata_filename = uncompressed_metadata_filename + '.gz'
if gzip_metadata_filename in self.metadata['current'] \
[referenced_metadata]['meta']:
if 'gzip' in self.metadata['current']['root']['compression_algorithms']:
compression = 'gzip'
compressed_fileinfo = self.metadata['current'][referenced_metadata] \
['meta'][gzip_metadata_filename]
logger.debug('Compressed version of '+\
repr(uncompressed_metadata_filename)+' is available at '+\
repr(gzip_metadata_filename)+'.')
logger.debug('Compressed version of ' + \
repr(uncompressed_metadata_filename) + ' is available at ' + \
repr(gzip_metadata_filename) + '.')
else:
logger.debug('Compressed version of '+\
repr(uncompressed_metadata_filename)+' not available.')
logger.debug('Compressed version of ' + \
repr(uncompressed_metadata_filename) + ' not available.')
# Simply return if the file has not changed, according to the metadata
# about the uncompressed file provided by the referenced metadata.
if not self._fileinfo_has_changed(uncompressed_metadata_filename,
uncompressed_fileinfo):
logger.info(repr(uncompressed_metadata_filename)+' up-to-date.')
if not self._versioninfo_has_changed(uncompressed_metadata_filename,
expected_versioninfo):
logger.info(repr(uncompressed_metadata_filename) + ' up-to-date.')
return
logger.debug('Metadata '+repr(uncompressed_metadata_filename)+\
logger.debug('Metadata ' + repr(uncompressed_metadata_filename) + \
' has changed.')
# The file lengths of metadata are unknown, only their version numbers are
# known. Set an upper limit for the length of the downloaded file for each
# expected role. Note: The Timestamp role is not updated via this
# function.
if metadata_role == 'snapshot':
upperbound_filelength = tuf.conf.DEFAULT_SNAPSHOT_REQUIRED_LENGTH
elif metadata_role == 'root':
upperbound_filelength = tuf.conf.DEFAULT_ROOT_REQUIRED_LENGTH
# The metadata is considered Targets (or delegated Targets metadata).
else:
upperbound_filelength = tuf.conf.DEFAULT_TARGETS_REQUIRED_LENGTH
try:
self._update_metadata(metadata_role, uncompressed_fileinfo, compression,
compressed_fileinfo)
self._update_metadata(metadata_role, upperbound_filelength,
expected_versioninfo['version'], compression)
except:
# The current metadata we have is not current but we couldn't
# get new metadata. We shouldn't use the old metadata anymore.
@ -1544,14 +1642,14 @@ def _update_metadata_if_changed(self, metadata_role,
# We shouldn't need to, but we need to check the trust
# implications of the current implementation.
self._delete_metadata(metadata_role)
logger.error('Metadata for '+repr(metadata_role)+' cannot be updated.')
logger.error('Metadata for ' +repr(metadata_role) + ' cannot be updated.')
raise
else:
# We need to remove delegated roles because the delegated roles
# may not be trusted anymore.
# We need to remove delegated roles because the delegated roles may not
# be trusted anymore.
if metadata_role == 'targets' or metadata_role.startswith('targets/'):
logger.debug('Removing delegated roles of '+repr(metadata_role)+'.')
logger.debug('Removing delegated roles of ' + repr(metadata_role) + '.')
# TODO: Should we also remove the keys of the delegated roles?
tuf.roledb.remove_delegated_roles(metadata_role)
@ -1561,85 +1659,73 @@ def _update_metadata_if_changed(self, metadata_role,
def _fileinfo_has_changed(self, metadata_filename, new_fileinfo):
def _versioninfo_has_changed(self, metadata_filename, new_versioninfo):
"""
<Purpose>
Non-public method that determines whether the current fileinfo of
'metadata_filename' differs from 'new_fileinfo'. The 'new_fileinfo'
Non-public method that determines whether the current versioninfo of
'metadata_filename' differs from 'new_versioninfo'. The 'new_versioninfo'
argument should be extracted from the latest copy of the metadata that
references 'metadata_filename'. Example: 'root.json' would be referenced
by 'snapshot.json'.
'new_fileinfo' should only be 'None' if this is for updating 'root.json'
without having 'snapshot.json' available.
'new_versioninfo' should only be 'None' if this is for updating
'root.json' without having 'snapshot.json' available.
<Arguments>
metadadata_filename:
The metadata filename for the role. For the 'root' role,
'metadata_filename' would be 'root.json'.
new_fileinfo:
new_versioninfo:
A dict object representing the new file information for
'metadata_filename'. 'new_fileinfo' may be 'None' when
'metadata_filename'. 'new_versioninfo' may be 'None' when
updating 'root' without having 'snapshot' available. This
dict conforms to 'tuf.formats.FILEINFO_SCHEMA' and has
dict conforms to 'tuf.formats.VERSIONINFO_SCHEMA' and has
the form:
{'length': 23423
'hashes': {'sha256': adfbc32343..}}
{'version': 288}
<Exceptions>
None.
<Side Effects>
If there is no fileinfo currently loaded for 'metada_filename',
If there is no versioninfo currently loaded for 'metada_filename',
try to load it.
<Returns>
Boolean. True if the fileinfo has changed, false otherwise.
Boolean. True if the versioninfo has changed, false otherwise.
"""
# If there is no fileinfo currently stored for 'metadata_filename',
# try to load the file, calculate the fileinfo, and store it.
if metadata_filename not in self.fileinfo:
self._update_fileinfo(metadata_filename)
# If there is no versioninfo currently stored for 'metadata_filename',
# try to load the file, calculate the versioninfo, and store it.
if metadata_filename not in self.versioninfo:
self._update_versioninfo(metadata_filename)
# Return true if there is no fileinfo for 'metadata_filename'.
# 'metadata_filename' is not in the 'self.fileinfo' store
# Return true if there is no versioninfo for 'metadata_filename'.
# 'metadata_filename' is not in the 'self.versioninfo' store
# and it doesn't exist in the 'current' metadata location.
if self.fileinfo[metadata_filename] is None:
if self.versioninfo[metadata_filename] is None:
return True
current_fileinfo = self.fileinfo[metadata_filename]
current_versioninfo = self.versioninfo[metadata_filename]
if current_fileinfo['length'] != new_fileinfo['length']:
if new_versioninfo['version'] > current_versioninfo['version']:
return True
# Now compare hashes. Note that the reason we can't just do a simple
# equality check on the fileinfo dicts is that we want to support the
# case where the hash algorithms listed in the metadata have changed
# without having that result in considering all files as needing to be
# updated, or not all hash algorithms listed can be calculated on the
# specific client.
for algorithm, hash_value in six.iteritems(new_fileinfo['hashes']):
# We're only looking for a single match. This isn't a security
# check, we just want to prevent unnecessary downloads.
if algorithm in current_fileinfo['hashes']:
if hash_value == current_fileinfo['hashes'][algorithm]:
return False
return True
else:
return False
def _update_fileinfo(self, metadata_filename):
def _update_versioninfo(self, metadata_filename):
"""
<Purpose>
Non-public method that updates the 'self.fileinfo' entry for the metadata
belonging to 'metadata_filename'. If the 'current' metadata for
'metadata_filename' cannot be loaded, set its fileinfo' to 'None' to
signal that it is not in the 'self.fileinfo' AND it also doesn't exist
Non-public method that updates the 'self.versioninfo' entry for the
metadata belonging to 'metadata_filename'. If the current metadata for
'metadata_filename' cannot be loaded, set its 'versioninfo' to 'None' to
signal that it is not in 'self.versioninfo' AND it also doesn't exist
locally.
<Arguments>
@ -1651,32 +1737,67 @@ def _update_fileinfo(self, metadata_filename):
None.
<Side Effects>
The file details of 'metadata_filename' is calculated and
stored in 'self.fileinfo'.
The version number of 'metadata_filename' is calculated and stored in its
corresponding entry in 'self.versioninfo'.
<Returns>
None.
"""
# In case we delayed loading the metadata and didn't do it in
# __init__ (such as with delegated metadata), then get the file
# __init__ (such as with delegated metadata), then get the version
# info now.
# Save the path to the current metadata file for 'metadata_filename'.
current_filepath = os.path.join(self.metadata_directory['current'],
metadata_filename)
# If the path is invalid, simply return and leave fileinfo unset.
# If the path is invalid, simply return and leave versioninfo unset.
if not os.path.exists(current_filepath):
self.fileinfo[metadata_filename] = None
self.versioninfo[metadata_filename] = None
return
# Extract the file information from the actual file and save it
# to the fileinfo store.
file_length, hashes = tuf.util.get_file_details(current_filepath)
metadata_fileinfo = tuf.formats.make_fileinfo(file_length, hashes)
self.fileinfo[metadata_filename] = metadata_fileinfo
# Extract the version information from the trusted snapshot role and save
# it to the 'self.versioninfo' store.
if metadata_filename == 'timestamp.json':
trusted_versioninfo = \
self.metadata['current']['timestamp']['version']
# When updating snapshot.json, the client either (1) has a copy of
# snapshot.json, or (2) in the process of obtaining it by first downloading
# timestamp.json. Note: Clients may have only root.json and perform a
# refresh of top-level metadata to obtain the remaining roles.
elif metadata_filename == 'snapshot.json':
# Verify the version number of the currently trusted snapshot.json in
# snapshot.json itself. Checking the version number specified in
# timestamp.json may be greater than the version specified in the
# client's copy of snapshot.json.
try:
timestamp_version_number = self.metadata['current']['snapshot']['version']
trusted_versioninfo = tuf.formats.make_versioninfo(timestamp_version_number)
except KeyError:
trusted_versioninfo = \
self.metadata['current']['timestamp']['meta']['snapshot.json']
else:
try:
# The metadata file names in 'self.metadata' exclude the role
# extension. Strip the '.json' extension when checking if
# 'metadata_filename' currently exists.
targets_version_number = \
self.metadata['current'][metadata_filename[:-len('.json')]]['version']
trusted_versioninfo = \
tuf.formats.make_versioninfo(targets_version_number)
except KeyError:
trusted_versioninfo = \
self.metadata['current']['snapshot']['meta'][metadata_filenamed]
self.versioninfo[metadata_filename] = trusted_versioninfo
@ -2721,20 +2842,15 @@ def download_target(self, target, destination_directory):
target_filepath.lstrip(os.sep))
destination = os.path.abspath(destination)
target_dirpath = os.path.dirname(destination)
if target_dirpath:
try:
os.makedirs(target_dirpath)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
else:
message = repr(target_dirpath) + ' does not exist.'
logger.warning(message)
raise tuf.Error(message)
try:
os.makedirs(target_dirpath)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
target_file_object.move(destination)

View file

@ -58,12 +58,21 @@
# default but sane upper bound for the number of bytes required to download it.
DEFAULT_TIMESTAMP_REQUIRED_LENGTH = 16384 #bytes
# The Root role may be updated without knowing its hash if top-level metadata
# cannot be safely downloaded (e.g., keys may have been revoked, thus requiring
# a new Root file that includes the updated keys). Set a default upper bound
# for the maximum total bytes that may be downloaded for Root metadata.
# The Root role may be updated without knowing its version if top-level
# metadata cannot be safely downloaded (e.g., keys may have been revoked, thus
# requiring a new Root file that includes the updated keys). Set a default
# upper bound for the maximum total bytes that may be downloaded for Root
# metadata.
DEFAULT_ROOT_REQUIRED_LENGTH = 512000 #bytes
# Set a default, but sane, upper bound for the number of bytes required to
# download Snapshot metadata.
DEFAULT_SNAPSHOT_REQUIRED_LENGTH = 2000000 #bytes
# Set a default, but sane, upper bound for the number of bytes required to
# download Targets metadata.
DEFAULT_TARGETS_REQUIRED_LENGTH = 5000000 #bytes
# Set a timeout value in seconds (float) for non-blocking socket operations.
SOCKET_TIMEOUT = 2 #seconds

View file

@ -526,7 +526,8 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
if tuf.sig.verify(signable, rolename) or write_partial:
_remove_invalid_and_duplicate_signatures(signable)
compressions = roleinfo['compressions']
filename = write_metadata_file(signable, metadata_filename, compressions,
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], compressions,
False)
# 'signable' contains an invalid threshold of signatures.

View file

@ -413,7 +413,7 @@ def _get_opener(scheme=None):
https_handler = VerifiedHTTPSHandler()
opener = six.moves.urllib.request.build_opener(https_handler)
# strip out HTTPHandler to prevent MITM spoof
# Strip out HTTPHandler to prevent MITM spoof.
for handler in opener.handlers:
if isinstance(handler, six.moves.urllib.request.HTTPHandler):
opener.handlers.remove(handler)
@ -662,7 +662,7 @@ def connect(self):
self._tunnel()
# set location of certificate authorities
assert os.path.isfile( tuf.conf.ssl_certificates )
assert os.path.isfile(tuf.conf.ssl_certificates)
cert_path = tuf.conf.ssl_certificates
# TODO: Disallow SSLv2.

View file

@ -262,17 +262,35 @@
keyid = KEYID_SCHEMA,
keyval = KEYVAL_SCHEMA)
# Info that describes both metadata and target files.
# This schema allows the storage of multiple hashes for the same file
# (e.g., sha256 and sha512 may be computed for the same file and stored).
# Information about target files, like file length and file hash(es). This
# schema allows the storage of multiple hashes for the same file (e.g., sha256
# and sha512 may be computed for the same file and stored).
FILEINFO_SCHEMA = SCHEMA.Object(
object_name = 'FILEINFO_SCHEMA',
length = LENGTH_SCHEMA,
hashes = HASHDICT_SCHEMA,
custom = SCHEMA.Optional(SCHEMA.Object()))
# A dict holding the information for a particular file. The keys hold the
# relative file path and the values the relevant file information.
# Version information specified in "snapshot.json" for each role available on
# the TUF repository. The 'FILEINFO_SCHEMA' object was previously listed in
# the snapshot role, but was switched to this object format to reduce the
# amount of metadata that needs to be downloaded. Listing version numbers in
# "snapshot.json" also prevents rollback attacks for roles that clients have
# not downloaded.
VERSIONINFO_SCHEMA = SCHEMA.Object(
object_name = 'VERSIONINFO_SCHEMA',
version = METADATAVERSION_SCHEMA)
# A dict holding the version information for a particular metadata role. The
# dict keys hold the relative file paths, and the dict values the corresponding
# version numbers.
VERSIONDICT_SCHEMA = SCHEMA.DictOf(
key_schema = RELPATH_SCHEMA,
value_schema = VERSIONINFO_SCHEMA)
# A dict holding the information for a particular target / file. The dict keys
# hold the relative file paths, and the dict values the corresponding file
# information.
FILEDICT_SCHEMA = SCHEMA.DictOf(
key_schema = RELPATH_SCHEMA,
value_schema = FILEINFO_SCHEMA)
@ -286,9 +304,9 @@
# A list of TARGETFILE_SCHEMA.
TARGETFILES_SCHEMA = SCHEMA.ListOf(TARGETFILE_SCHEMA)
# A single signature of an object. Indicates the signature, the id of the
# A single signature of an object. Indicates the signature, the ID of the
# signing key, and the signing method.
# I debated making the signature schema not contain the key id and instead have
# I debated making the signature schema not contain the key ID and instead have
# the signatures of a file be a dictionary with the key being the keyid and the
# value being the signature schema without the keyid. That would be under
# the argument that a key should only be able to sign a file once. However,
@ -439,6 +457,7 @@
_type = SCHEMA.String('Root'),
version = METADATAVERSION_SCHEMA,
consistent_snapshot = BOOLEAN_SCHEMA,
compression_algorithms = COMPRESSIONS_SCHEMA,
expires = ISO8601_DATETIME_SCHEMA,
keys = KEYDICT_SCHEMA,
roles = ROLEDICT_SCHEMA)
@ -458,7 +477,7 @@
_type = SCHEMA.String('Snapshot'),
version = METADATAVERSION_SCHEMA,
expires = ISO8601_DATETIME_SCHEMA,
meta = FILEDICT_SCHEMA)
meta = VERSIONDICT_SCHEMA)
# Timestamp role: indicates the latest version of the snapshot file.
TIMESTAMP_SCHEMA = SCHEMA.Object(
@ -466,7 +485,7 @@
_type = SCHEMA.String('Timestamp'),
version = METADATAVERSION_SCHEMA,
expires = ISO8601_DATETIME_SCHEMA,
meta = FILEDICT_SCHEMA)
meta = VERSIONDICT_SCHEMA)
# project.cfg file: stores information about the project in a json dictionary
PROJECT_CFG_SCHEMA = SCHEMA.Object(
@ -554,11 +573,11 @@ def __getattr__(self, name):
class TimestampFile(MetaFile):
def __init__(self, version, expires, filedict):
def __init__(self, version, expires, versiondict):
self.info = {}
self.info['version'] = version
self.info['expires'] = expires
self.info['meta'] = filedict
self.info['meta'] = versiondict
@staticmethod
@ -569,17 +588,17 @@ def from_metadata(object):
version = object['version']
expires = object['expires']
filedict = object['meta']
versiondict = object['meta']
return TimestampFile(version, expires, filedict)
return TimestampFile(version, expires, versiondict)
@staticmethod
def make_metadata(version, expiration_date, filedict):
def make_metadata(version, expiration_date, versiondict):
result = {'_type' : 'Timestamp'}
result['version'] = version
result['expires'] = expiration_date
result['meta'] = filedict
result['meta'] = versiondict
# Is 'result' a Timestamp metadata file?
# Raise 'tuf.FormatError' if not.
@ -592,13 +611,15 @@ def make_metadata(version, expiration_date, filedict):
class RootFile(MetaFile):
def __init__(self, version, expires, keys, roles, consistent_snapshot):
def __init__(self, version, expires, keys, roles, consistent_snapshot,
compression_algorithms):
self.info = {}
self.info['version'] = version
self.info['expires'] = expires
self.info['keys'] = keys
self.info['roles'] = roles
self.info['consistent_snapshot'] = consistent_snapshot
self.info['compression_algorithms'] = compression_algorithms
@staticmethod
@ -612,19 +633,22 @@ def from_metadata(object):
keys = object['keys']
roles = object['roles']
consistent_snapshot = object['consistent_snapshot']
compression_algorithms = object['compression_algorithms']
return RootFile(version, expires, keys, roles, consistent_snapshot)
return RootFile(version, expires, keys, roles, consistent_snapshot,
compression_algorithms)
@staticmethod
def make_metadata(version, expiration_date, keydict, roledict,
consistent_snapshot):
consistent_snapshot, compression_algorithms):
result = {'_type' : 'Root'}
result['version'] = version
result['expires'] = expiration_date
result['keys'] = keydict
result['roles'] = roledict
result['consistent_snapshot'] = consistent_snapshot
result['compression_algorithms'] = compression_algorithms
# Is 'result' a Root metadata file?
# Raise 'tuf.FormatError' if not.
@ -637,11 +661,11 @@ def make_metadata(version, expiration_date, keydict, roledict,
class SnapshotFile(MetaFile):
def __init__(self, version, expires, filedict):
def __init__(self, version, expires, versiondict):
self.info = {}
self.info['version'] = version
self.info['expires'] = expires
self.info['meta'] = filedict
self.info['meta'] = versiondict
@staticmethod
@ -652,17 +676,17 @@ def from_metadata(object):
version = object['version']
expires = object['expires']
filedict = object['meta']
versiondict = object['meta']
return SnapshotFile(version, expires, filedict)
return SnapshotFile(version, expires, versiondict)
@staticmethod
def make_metadata(version, expiration_date, filedict):
def make_metadata(version, expiration_date, versiondict):
result = {'_type' : 'Snapshot'}
result['version'] = version
result['expires'] = expiration_date
result['meta'] = filedict
result['meta'] = versiondict
# Is 'result' a Snapshot metadata file?
# Raise 'tuf.FormatError' if not.
@ -1000,6 +1024,42 @@ def make_fileinfo(length, hashes, custom=None):
def make_versioninfo(version_number):
"""
<Purpose>
Create a dictionary conformant to 'VERSIONINFO_SCHEMA'. This dict
describes both metadata and target files.
<Arguments>
version_number:
An integer representing the version of a particular metadata role.
The dictionary returned by this function is expected to be included
in Snapshot metadata.
<Exceptions>
tuf.FormatError, if the 'VERSIONINFO_SCHEMA' to be returned
does not have the correct format.
<Side Effects>
If any of the arguments are incorrectly formatted, the dict
returned will be checked for formatting errors, and if found,
will raise a 'tuf.FormatError' exception.
<Returns>
A dictionary conformant to 'VERSIONINFO_SCHEMA', containing the version
information of a metadata role.
"""
versioninfo = {'version' : version_number}
# Raise 'tuf.FormatError' if 'versioninfo' is improperly formatted.
VERSIONINFO_SCHEMA.check_match(versioninfo)
return versioninfo
def make_role_metadata(keyids, threshold, name=None, paths=None,
path_hash_prefixes=None):
"""

View file

@ -99,12 +99,13 @@
SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz']
# The full list of supported TUF metadata extensions.
METADATA_EXTENSIONS = ['.json', '.json.gz']
METADATA_EXTENSIONS = ['.json']
def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
targets_directory, metadata_directory,
consistent_snapshot=False, filenames=None):
consistent_snapshot=False, filenames=None,
compression_algorithms=['gz']):
"""
Non-public function that can generate and write the metadata of the specified
top-level 'rolename'. It also increments version numbers if:
@ -120,12 +121,12 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
# Retrieve the roleinfo of 'rolename' to extract the needed metadata
# attributes, such as version number, expiration, etc.
roleinfo = tuf.roledb.get_roleinfo(rolename)
snapshot_compressions = tuf.roledb.get_roleinfo('snapshot')['compressions']
# Generate the appropriate role metadata for 'rolename'.
if rolename == 'root':
metadata = generate_root_metadata(roleinfo['version'],
roleinfo['expires'], consistent_snapshot)
roleinfo['expires'], consistent_snapshot,
compression_algorithms)
_log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'],
ROOT_EXPIRES_WARN_SECONDS)
@ -138,18 +139,20 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
roleinfo['expires'],
roleinfo['delegations'],
consistent_snapshot)
if rolename == 'targets':
_log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'],
TARGETS_EXPIRES_WARN_SECONDS)
elif rolename == 'snapshot':
root_filename = filenames['root']
targets_filename = filenames['targets']
root_filename = ROOT_FILENAME[:-len(METADATA_EXTENSION)]
targets_filename = TARGETS_FILENAME[:-len(METADATA_EXTENSION)]
metadata = generate_snapshot_metadata(metadata_directory,
roleinfo['version'],
roleinfo['expires'], root_filename,
targets_filename,
consistent_snapshot)
_log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'],
SNAPSHOT_EXPIRES_WARN_SECONDS)
@ -158,8 +161,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
snapshot_filename = filenames['snapshot']
metadata = generate_timestamp_metadata(snapshot_filename,
roleinfo['version'],
roleinfo['expires'],
snapshot_compressions)
roleinfo['expires'])
_log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'],
TIMESTAMP_EXPIRES_WARN_SECONDS)
@ -179,12 +181,21 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
status = tuf.sig.get_signature_status(temp_signable, rolename)
if len(status['good_sigs']) == 0:
metadata['version'] = metadata['version'] + 1
roleinfo = tuf.roledb.get_roleinfo(rolename)
roleinfo['version'] = roleinfo['version'] + 1
tuf.roledb.update_roleinfo(rolename, roleinfo)
signable = sign_metadata(metadata, roleinfo['signing_keyids'],
metadata_filename)
# non-partial write()
else:
# If writing a new version of 'rolename,' increment its version number in
# both the metadata file and roledb (required so that snapshot references
# the latest version).
if tuf.sig.verify(signable, rolename) and not roleinfo['partial_loaded']:
metadata['version'] = metadata['version'] + 1
roleinfo = tuf.roledb.get_roleinfo(rolename)
roleinfo['version'] = roleinfo['version'] + 1
tuf.roledb.update_roleinfo(rolename, roleinfo)
signable = sign_metadata(metadata, roleinfo['signing_keyids'],
metadata_filename)
@ -193,16 +204,16 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
if tuf.sig.verify(signable, rolename) or write_partial:
_remove_invalid_and_duplicate_signatures(signable)
compressions = roleinfo['compressions']
filename = write_metadata_file(signable, metadata_filename, compressions,
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], compression_algorithms,
consistent_snapshot)
# The root and timestamp files should also be written without a digest if
# 'consistent_snaptshots' is True. Client may request a timestamp and root
# file without knowing its digest and file size.
# The root and timestamp files should also be written without a version
# number prepended if 'consistent_snaptshot' is True. Clients may request
# a timestamp and root file without knowing their version numbers.
if rolename == 'root' or rolename == 'timestamp':
write_metadata_file(signable, metadata_filename, compressions,
consistent_snapshot=False)
write_metadata_file(signable, metadata_filename, metadata['version'],
compression_algorithms, consistent_snapshot=False)
# 'signable' contains an invalid threshold of signatures.
@ -210,7 +221,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
message = 'Not enough signatures for ' + repr(metadata_filename)
raise tuf.UnsignedMetadataError(message, signable)
return signable, filename
return signable, filename
@ -434,15 +445,16 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
metadata_name = \
metadata_path[len(metadata_directory):].lstrip(os.path.sep)
# Strip the digest if 'consistent_snapshot' is True.
# Example: 'targets/unclaimed/13df98ab0.django.json' -->
# Strip the version number if 'consistent_snapshot' is True. Example:
# 'targets/unclaimed/10.django.json' -->
# 'targets/unclaimed/django.json'. Consistent and non-consistent
# metadata might co-exist if write() and write(consistent_snapshot=True)
# are mixed, so ensure only 'digest.filename' metadata is stripped.
embeded_digest = None
# metadata might co-exist if write() and
# write(consistent_snapshot=True) are mixed, so ensure only
# '<version_number>.filename' metadata is stripped.
embeded_version_number = None
if metadata_name not in snapshot_metadata['meta']:
metadata_name, embeded_digest = \
_strip_consistent_snapshot_digest(metadata_name, consistent_snapshot)
metadata_name, embeded_version_number = \
_strip_consistent_snapshot_version_number(metadata_name, consistent_snapshot)
# Strip filename extensions. The role database does not include the
# metadata extension.
@ -452,71 +464,68 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
metadata_name = metadata_name[:-len(metadata_extension)]
# Delete the metadata file if it does not exist in 'tuf.roledb'.
# 'repository_tool.py' might have marked 'metadata_name' as removed, but
# its metadata file is not actually deleted yet. Do it now.
# 'repository_tool.py' might have removed 'metadata_name,'
# but its metadata file is not actually deleted yet. Do it now.
if not tuf.roledb.role_exists(metadata_name):
logger.info('Removing outdated metadata: ' + repr(metadata_path))
os.remove(metadata_path)
# Delete outdated consistent snapshots. snapshot metadata includes
# the file extension of roles.
if consistent_snapshot and embeded_digest is not None:
# Delete outdated consistent snapshots. Snapshot metadata includes the
# file extension of roles. TODO: Should we leave it up to integrators
# to remove outdated consistent snapshots?
"""
if consistent_snapshot and embeded_version_number is not None:
file_hashes = list(snapshot_metadata['meta'][metadata_name_extension] \
['hashes'].values())
if embeded_digest not in file_hashes:
logger.info('Removing outdated metadata: ' + repr(metadata_path))
os.remove(metadata_path)
"""
def _get_written_metadata_and_digests(metadata_signable):
def _get_written_metadata(metadata_signable):
"""
Non-public function that returns the actual content of written metadata and
its digest.
Non-public function that returns the actual content of written metadata.
"""
# Explicitly specify the JSON separators for Python 2 + 3 consistent.
# Explicitly specify the JSON separators for Python 2 + 3 consistency.
written_metadata_content = \
json.dumps(metadata_signable, indent=1, separators=(',', ': '),
sort_keys=True).encode('utf-8')
written_metadata_digests = {}
for hash_algorithm in tuf.conf.REPOSITORY_HASH_ALGORITHMS:
digest_object = tuf.hash.digest(hash_algorithm)
digest_object.update(written_metadata_content)
written_metadata_digests.update({hash_algorithm: digest_object.hexdigest()})
return written_metadata_content, written_metadata_digests
return written_metadata_content
def _strip_consistent_snapshot_digest(metadata_filename, consistent_snapshot):
def _strip_consistent_snapshot_version_number(metadata_filename,
consistent_snapshot):
"""
Strip from 'metadata_filename' any digest data (in the expected
'{dirname}/digest.filename' format) that it may contain, and return it.
Strip from 'metadata_filename' any version data (in the expected
'{dirname}/version_number.filename' format) that it may contain, and return
the stripped filename and its version number as a tuple.
"""
embeded_digest = ''
embeded_version_number = ''
# Strip the digest if 'consistent_snapshot' is True.
# Example: 'targets/unclaimed/13df98ab0.django.json' -->
# Strip the version number if 'consistent_snapshot' is True.
# Example: 'targets/unclaimed/10.django.json' -->
# 'targets/unclaimed/django.json'
if consistent_snapshot:
dirname, basename = os.path.split(metadata_filename)
embeded_digest = basename[:basename.find('.')]
embeded_version_number = basename[:basename.find('.')]
# Ensure the digest, including the period, is stripped.
# Ensure the version number, including the period, is stripped.
basename = basename[basename.find('.') + 1:]
metadata_filename = os.path.join(dirname, basename)
return metadata_filename, embeded_digest
return metadata_filename, embeded_version_number
@ -524,8 +533,8 @@ def _strip_consistent_snapshot_digest(metadata_filename, consistent_snapshot):
def _load_top_level_metadata(repository, top_level_filenames):
"""
Load the metadata of the Root, Timestamp, Targets, and Snapshot roles.
At a minimum, the Root role must exist and successfully load.
Load the metadata of the Root, Timestamp, Targets, and Snapshot roles. At a
minimum, the Root role must exist and successfully load.
"""
root_filename = top_level_filenames[ROOT_FILENAME]
@ -538,7 +547,8 @@ def _load_top_level_metadata(repository, top_level_filenames):
snapshot_metadata = None
timestamp_metadata = None
# Load 'root.json'. A Root role file without a digest is always written.
# Load 'root.json'. A Root role file without a version number is always
# written.
if os.path.exists(root_filename):
# Initialize the key and role metadata of the top-level roles.
signable = tuf.util.load_json_file(root_filename)
@ -554,11 +564,11 @@ def _load_top_level_metadata(repository, top_level_filenames):
if signature not in roleinfo['signatures']:
roleinfo['signatures'].append(signature)
if os.path.exists(root_filename+'.gz'):
if os.path.exists(root_filename + '.gz'):
roleinfo['compressions'].append('gz')
# By default, roleinfo['partial_loaded'] of top-level roles should be set to
# False in 'create_roledb_from_root_metadata()'. Update this field, if
# By default, roleinfo['partial_loaded'] of top-level roles should be set
# to False in 'create_roledb_from_root_metadata()'. Update this field, if
# necessary, now that we have its signable object.
if _metadata_is_partially_loaded('root', signable, roleinfo):
roleinfo['partial_loaded'] = True
@ -572,11 +582,11 @@ def _load_top_level_metadata(repository, top_level_filenames):
consistent_snapshot = root_metadata['consistent_snapshot']
else:
message = 'Cannot load the required root file: '+repr(root_filename)
message = 'Cannot load the required root file: ' + repr(root_filename)
raise tuf.RepositoryError(message)
# Load 'timestamp.json'. A Timestamp role file without a digest is always
# written.
# Load 'timestamp.json'. A Timestamp role file without a version number is
# always written.
if os.path.exists(timestamp_filename):
signable = tuf.util.load_json_file(timestamp_filename)
timestamp_metadata = signable['signed']
@ -604,10 +614,9 @@ def _load_top_level_metadata(repository, top_level_filenames):
# Load 'snapshot.json'. A consistent snapshot of Snapshot must be calculated
# if 'consistent_snapshot' is True.
if consistent_snapshot:
snapshot_hashes = timestamp_metadata['meta'][SNAPSHOT_FILENAME]['hashes']
snapshot_digest = random.choice(list(snapshot_hashes.values()))
snapshot_version = timestamp_metadata['meta'][SNAPSHOT_FILENAME]['version']
dirname, basename = os.path.split(snapshot_filename)
snapshot_filename = os.path.join(dirname, snapshot_digest + '.' + basename)
snapshot_filename = os.path.join(dirname, str(snapshot_version) + '.' + basename)
if os.path.exists(snapshot_filename):
signable = tuf.util.load_json_file(snapshot_filename)
@ -620,7 +629,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
roleinfo = tuf.roledb.get_roleinfo('snapshot')
roleinfo['expires'] = snapshot_metadata['expires']
roleinfo['version'] = snapshot_metadata['version']
if os.path.exists(snapshot_filename+'.gz'):
if os.path.exists(snapshot_filename + '.gz'):
roleinfo['compressions'].append('gz')
if _metadata_is_partially_loaded('snapshot', signable, roleinfo):
@ -634,13 +643,12 @@ def _load_top_level_metadata(repository, top_level_filenames):
else:
pass
# Load 'targets.json'. A consistent snapshot of Targets must be calculated if
# 'consistent_snapshot' is True.
# Load 'targets.json'. A consistent snapshot of the Targets role must be
# calculated if 'consistent_snapshot' is True.
if consistent_snapshot:
targets_hashes = snapshot_metadata['meta'][TARGETS_FILENAME]['hashes']
targets_digest = random.choice(list(targets_hashes.values()))
targets_version = snapshot_metadata['meta'][TARGETS_FILENAME]['version']
dirname, basename = os.path.split(targets_filename)
targets_filename = os.path.join(dirname, targets_digest + '.' + basename)
targets_filename = os.path.join(dirname, str(targets_version) + '.' + basename)
if os.path.exists(targets_filename):
signable = tuf.util.load_json_file(targets_filename)
@ -657,7 +665,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
roleinfo['version'] = targets_metadata['version']
roleinfo['expires'] = targets_metadata['expires']
roleinfo['delegations'] = targets_metadata['delegations']
if os.path.exists(targets_filename+'.gz'):
if os.path.exists(targets_filename + '.gz'):
roleinfo['compressions'].append('gz')
if _metadata_is_partially_loaded('targets', signable, roleinfo):
@ -1265,6 +1273,47 @@ def get_metadata_fileinfo(filename, custom=None):
def get_metadata_versioninfo(rolename):
"""
<Purpose>
Retrieve the version information of 'rolename'. The object returned
conforms to 'tuf.formats.VERSIONINFO_SCHEMA'. The information
generated for 'rolename' is stored in 'snapshot.json'.
The versioninfo object returned has the form:
versioninfo = {'version': 14}
<Arguments>
rolename:
The metadata role whose versioninfo is needed. It must exist, otherwise
a 'tuf.UnknownRoleError' exception is raised.
<Exceptions>
tuf.FormatError, if 'rolename' is improperly formatted.
tuf.UnknownRoleError, if 'rolename' does not exist.
<Side Effects>
None.
<Returns>
A dictionary conformant to 'tuf.formats.VERSIONINFO_SCHEMA'. This
dictionary contains the version number of 'rolename'.
"""
# Does 'rolename' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
tuf.formats.ROLENAME_SCHEMA.check_match(rolename)
roleinfo = tuf.roledb.get_roleinfo(rolename)
versioninfo = {'version': roleinfo['version']}
return versioninfo
def get_target_hash(target_filepath):
"""
@ -1298,7 +1347,8 @@ def get_target_hash(target_filepath):
def generate_root_metadata(version, expiration_date, consistent_snapshot):
def generate_root_metadata(version, expiration_date, consistent_snapshot,
compression_algorithms=['gz']):
"""
<Purpose>
Create the root metadata. 'tuf.roledb.py' and 'tuf.keydb.py' are read and
@ -1319,6 +1369,11 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot):
Boolean. If True, a file digest is expected to be prepended to the
filename of any target file located in the targets directory. Each digest
is stripped from the target filename and listed in the snapshot metadata.
compression_algorithms:
A list of compression algorithms to use when generating the compressed
metadata files for the repository. The root file specifies the
algorithms used by the repository.
<Exceptions>
tuf.FormatError, if the generated root metadata object could not
@ -1341,6 +1396,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot):
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
tuf.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
# The role and key dictionaries to be saved in the root metadata object.
# Conformant to 'ROLEDICT_SCHEMA' and 'KEYDICT_SCHEMA', respectively.
@ -1398,7 +1454,8 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot):
# Generate the root metadata object.
root_metadata = tuf.formats.RootFile.make_metadata(version, expiration_date,
keydict, roledict,
consistent_snapshot)
consistent_snapshot,
compression_algorithms)
return root_metadata
@ -1490,7 +1547,7 @@ def generate_targets_metadata(targets_directory, target_files, version,
# Note: join() discards 'targets_directory' if 'target' contains a leading
# path separator (i.e., is treated as an absolute path).
target_path = os.path.join(targets_directory, target.lstrip(os.sep))
# Ensure all target files listed in 'target_files' exist. If just one of
# these files does not exist, raise an exception.
if not os.path.exists(target_path):
@ -1594,30 +1651,20 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
metadata_directory = _check_directory(metadata_directory)
# Retrieve the fileinfo of 'root.json' and 'targets.json'. This file
# information includes data such as file length, hashes of the file, etc.
filedict = {}
filedict[ROOT_FILENAME] = get_metadata_fileinfo(root_filename)
filedict[TARGETS_FILENAME] = get_metadata_fileinfo(targets_filename)
# Retrieve the versioninfo of 'root.json' and 'targets.json'. The
# versioninfo contains the version number of these roles.
versiondict = {}
versiondict[ROOT_FILENAME] = get_metadata_versioninfo(root_filename)
versiondict[TARGETS_FILENAME] = get_metadata_versioninfo(targets_filename)
# Add compressed versions of the 'targets.json' and 'root.json' metadata,
# if they exist.
for extension in SUPPORTED_COMPRESSION_EXTENSIONS:
compressed_root_filename = root_filename+extension
compressed_targets_filename = targets_filename+extension
# If the compressed versions of the root and targets metadata is found,
# add their file attributes to 'filedict'.
if os.path.exists(compressed_root_filename):
filedict[ROOT_FILENAME+extension] = \
get_metadata_fileinfo(compressed_root_filename)
if os.path.exists(compressed_targets_filename):
filedict[TARGETS_FILENAME+extension] = \
get_metadata_fileinfo(compressed_targets_filename)
# We previously also stored the compressed versions of roles in
# snapshot.json, however, this is no longer needed as their hashes and
# lengths are no longer used and their version numbers match the uncompressed
# role files.
# Walk the 'targets/' directory and generate the fileinfo of all the role
# files found. This information is stored in the 'meta' field of the snapshot
# metadata object.
# Walk the 'targets/' directory and generate the versioninfo of all the role
# files found. This information is stored in the 'meta' field of the
# snapshot metadata object.
targets_metadata = os.path.join(metadata_directory, 'targets')
if os.path.exists(targets_metadata) and os.path.isdir(targets_metadata):
for directory_path, junk_directories, files in os.walk(targets_metadata):
@ -1628,27 +1675,26 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
metadata_name = \
metadata_path[len(metadata_directory):].lstrip(os.path.sep)
# Strip the digest if 'consistent_snapshot' is True.
# Example: 'targets/unclaimed/13df98ab0.django.json' -->
# Strip the version number if 'consistent_snapshot' is True.
# Example: 'targets/unclaimed/10.django.json' -->
# 'targets/unclaimed/django.json'
metadata_name, digest_junk = \
_strip_consistent_snapshot_digest(metadata_name, consistent_snapshot)
metadata_name, version_number_junk = \
_strip_consistent_snapshot_version_number(metadata_name, consistent_snapshot)
# All delegated roles are added to the snapshot file, including
# compressed versions.
# All delegated roles are added to the snapshot file.
for metadata_extension in METADATA_EXTENSIONS:
if metadata_name.endswith(metadata_extension):
rolename = metadata_name[:-len(metadata_extension)]
# Obsolete role files may still be found. Ensure only roles loaded
# in the roledb are included in the snapshot metadata.
# in the roledb are included in the Snapshot metadata.
if tuf.roledb.role_exists(rolename):
filedict[metadata_name] = get_metadata_fileinfo(metadata_path)
versiondict[metadata_name] = get_metadata_versioninfo(rolename)
# Generate the snapshot metadata object.
# Generate the Snapshot metadata object.
snapshot_metadata = tuf.formats.SnapshotFile.make_metadata(version,
expiration_date,
filedict)
expiration_date,
versiondict)
return snapshot_metadata
@ -1656,8 +1702,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
def generate_timestamp_metadata(snapshot_filename, version,
expiration_date, compressions=()):
def generate_timestamp_metadata(snapshot_filename, version, expiration_date):
"""
<Purpose>
Generate the timestamp metadata object. The 'snapshot.json' file must
@ -1677,12 +1722,6 @@ def generate_timestamp_metadata(snapshot_filename, version,
The expiration date of the metadata file, conformant to
'tuf.formats.ISO8601_DATETIME_SCHEMA'.
compressions:
Compression extensions (e.g., 'gz'). If 'snapshot.json' is also saved in
compressed form, these compression extensions should be stored in
'compressions' so the compressed timestamp files can be added to the
timestamp metadata object.
<Exceptions>
tuf.FormatError, if the generated timestamp metadata object cannot be
formatted correctly, or one of the arguments is improperly formatted.
@ -1701,34 +1740,20 @@ def generate_timestamp_metadata(snapshot_filename, version,
tuf.formats.PATH_SCHEMA.check_match(snapshot_filename)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
tuf.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compressions)
# Retrieve the fileinfo of the snapshot metadata file.
# This file information contains hashes, file length, custom data, etc.
fileinfo = {}
fileinfo[SNAPSHOT_FILENAME] = get_metadata_fileinfo(snapshot_filename)
# Retrieve the versioninfo of the Snapshot metadata file.
versioninfo = {}
versioninfo[SNAPSHOT_FILENAME] = get_metadata_versioninfo('snapshot')
# Save the fileinfo of the compressed versions of 'timestamp.json'
# in 'fileinfo'. Log the files included in 'fileinfo'.
for file_extension in compressions:
if not len(file_extension):
continue
compressed_filename = snapshot_filename + '.' + file_extension
try:
compressed_fileinfo = get_metadata_fileinfo(compressed_filename)
except:
logger.warning('Cannot get fileinfo about ' + repr(compressed_filename))
else:
logger.info('Including fileinfo about ' + repr(compressed_filename))
fileinfo[SNAPSHOT_FILENAME + '.' + file_extension] = compressed_fileinfo
# We previously saved the versioninfo of the compressed versions of
# 'snapshot.json' in 'versioninfo'. Since version numbers are now stored,
# the version numbers of compressed roles do not change and can thus be
# excluded.
# Generate the timestamp metadata object.
timestamp_metadata = tuf.formats.TimestampFile.make_metadata(version,
expiration_date,
fileinfo)
versioninfo)
return timestamp_metadata
@ -1824,7 +1849,8 @@ def sign_metadata(metadata_object, keyids, filename):
def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
def write_metadata_file(metadata, filename, version_number,
compression_algorithms, consistent_snapshot):
"""
<Purpose>
If necessary, write the 'metadata' signable object to 'filename', and the
@ -1840,12 +1866,18 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
filename:
The filename of the metadata to be written (e.g., 'root.json').
If a compression algorithm is specified in 'compressions', the
If a compression algorithm is specified in 'compression_algorithms', the
compression extention is appended to 'filename'.
compressions:
Specify the algorithms, as a list of strings, used to compress the file;
The only currently available compression option is 'gz' (gzip).
version_number:
The version number of the metadata file to be written. The version
number is needed for consistent snapshots, which prepend the version
number to 'filename'.
compression_algorithms:
Specify the algorithms, as a list of strings, used to compress the
'metadata'; The only currently available compression option is 'gz'
(gzip).
consistent_snapshot:
Boolean that determines whether the metadata file's digest should be
@ -1872,37 +1904,43 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
# Raise 'tuf.FormatError' if the check fails.
tuf.formats.SIGNABLE_SCHEMA.check_match(metadata)
tuf.formats.PATH_SCHEMA.check_match(filename)
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compressions)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version_number)
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
# Verify the directory of 'filename', and convert 'filename' to its absolute
# path so that temporary files are moved to their expected destinations.
filename = os.path.abspath(filename)
written_filename = filename
written_consistent_filename = None
_check_directory(os.path.dirname(filename))
consistent_filenames = []
# Generate the actual metadata file content of 'metadata'. Metadata is
# saved as json and includes formatting, such as indentation and sorted
# saved as JSON and includes formatting, such as indentation and sorted
# objects. The new digest of 'metadata' is also calculated to help determine
# if re-saving is required.
file_content, new_digests = _get_written_metadata_and_digests(metadata)
file_content = _get_written_metadata(metadata)
if consistent_snapshot:
for new_digest in six.itervalues(new_digests):
dirname, basename = os.path.split(filename)
digest_and_filename = new_digest + '.' + basename
consistent_filenames.append(os.path.join(dirname, digest_and_filename))
written_filename = consistent_filenames.pop()
dirname, basename = os.path.split(filename)
version_and_filename = str(version_number) + '.' + basename
written_consistent_filename = os.path.join(dirname, version_and_filename)
# Verify whether new metadata needs to be written (i.e., has not been
# previously written or has changed.
write_new_metadata = False
# Has the uncompressed metadata changed? Does it exist? If so, set
# 'write_compressed_version' to True so that it is written.
# compressed metadata should only be written if it does not exist or the
# 'write_compressed_version' to 'True' so that it is written.
# Compressed metadata should only be written if it does not exist or the
# uncompressed version has changed).
new_digests = {}
hash_algorithms = tuf.conf.REPOSITORY_HASH_ALGORITHMS
for hash_algorithm in hash_algorithms:
digest_object = tuf.hash.digest(hash_algorithm)
digest_object.update(file_content)
new_digests.update({hash_algorithm: digest_object.hexdigest()})
try:
file_length_junk, old_digests = tuf.util.get_file_details(written_filename)
if old_digests != new_digests:
@ -1926,25 +1964,24 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
file_object.write(file_content)
logger.debug('Saving ' + repr(written_filename))
file_object.move(written_filename)
for consistent_filename in consistent_filenames:
logger.info('Linking ' + repr(consistent_filename))
os.link(written_filename, consistent_filename)
if consistent_snapshot:
logger.info('Linking ' + repr(written_consistent_filename))
os.link(written_filename, written_consistent_filename)
# Generate the compressed versions of 'metadata', if necessary. A compressed
# file may be written (without needing to write the uncompressed version) if
# the repository maintainer adds compression after writing the uncompressed
# version.
for compression in compressions:
for compression_algorithm in compression_algorithms:
file_object = None
# Ignore the empty string that signifies non-compression. The uncompressed
# file was previously written above, if necessary.
if not len(compression):
if not len(compression_algorithm):
continue
elif compression == 'gz':
elif compression_algorithm == 'gz':
file_object = tuf.util.TempFile()
compressed_filename = filename + '.gz'
@ -1959,7 +1996,7 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshot):
gzip_object.close()
else:
raise tuf.FormatError('Unknown compression algorithm: '+repr(compression))
raise tuf.FormatError('Unknown compression algorithm: ' + repr(compressio_algorithm))
# Save the compressed version, ensuring an unchanged file is not re-saved.
# Re-saving the same compressed version may cause its digest to unexpectedly

View file

@ -178,7 +178,8 @@ def __init__(self, repository_directory, metadata_directory, targets_directory):
def write(self, write_partial=False, consistent_snapshot=False):
def write(self, write_partial=False, consistent_snapshot=False,
compression_algorithms=['gz']):
"""
<Purpose>
Write all the JSON Metadata objects to their corresponding files.
@ -196,10 +197,15 @@ def write(self, write_partial=False, consistent_snapshot=False):
consistent_snapshot:
A boolean indicating whether written metadata and target files should
include a digest in the filename (i.e., <digest>.root.json,
<digest>.targets.json.gz, <digest>.README.json, where <digest> is the
file's SHA256 digest. Example:
1f4e35a60c8f96d439e27e858ce2869c770c1cdd54e1ef76657ceaaf01da18a3.root.json'
include a version number in the filename (i.e.,
<version_number>.root.json, <version_number>.targets.json.gz,
<version_number>.README.json, where <version_number> is the file's
SHA256 digest. Example: 13.root.json'
compression_algorithms:
A list of compression algorithms. Each of these algorithms will be
used to compress all of the metadata available on the repository.
By default, all metadata is compressed with gzip.
<Exceptions>
tuf.UnsignedMetadataError, if any of the top-level and delegated roles do
@ -217,7 +223,9 @@ def write(self, write_partial=False, consistent_snapshot=False):
# types, and that all dict keys are properly named.
# Raise 'tuf.FormatError' if any are improperly formatted.
tuf.formats.BOOLEAN_SCHEMA.check_match(write_partial)
tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
tuf.formats.COMPRESSIONS_SCHEMA.check_match(compression_algorithms)
# At this point the tuf.keydb and tuf.roledb stores must be fully
# populated, otherwise write() throwns a 'tuf.UnsignedMetadataError'
@ -2021,7 +2029,7 @@ def delegate(self, rolename, public_keys, list_of_targets, threshold=1,
full_rolename = self._rolename + '/' + rolename
if tuf.roledb.role_exists(full_rolename):
raise tuf.Error(repr(full_rolename) + ' already delegated.')
raise tuf.Error(repr(rolename) + ' already delegated.')
# Keep track of the valid keyids (added to the new Targets object) and their
# keydicts (added to this Targets delegations).
@ -2696,10 +2704,9 @@ def load_repository(repository_directory):
filenames = repo_lib.get_metadata_filenames(metadata_directory)
# The Root file is always available without a consistent snapshots digest
# attached to the filename. Store the 'consistent_snapshot' value read the
# loaded Root file so that other metadata files may be located.
# 'consistent_snapshot' value.
# The Root file is always available without a version number (a consistent
# snapshot) attached to the filename. Store the 'consistent_snapshot' value
# and read the loaded Root file so that other metadata files may be located.
consistent_snapshot = False
# Load the metadata of the top-level roles (i.e., Root, Timestamp, Targets,
@ -2726,11 +2733,11 @@ def load_repository(repository_directory):
metadata_name = \
metadata_path[len(metadata_directory):].lstrip(os.path.sep)
# Strip the digest if 'consistent_snapshot' is True.
# Example: 'targets/unclaimed/13df98ab0.django.json' -->
# Strip the version number if 'consistent_snapshot' is True.
# Example: 'targets/unclaimed/10.django.json' -->
# 'targets/unclaimed/django.json'
metadata_name, digest_junk = \
repo_lib._strip_consistent_snapshot_digest(metadata_name,
metadata_name, version_number_junk = \
repo_lib._strip_consistent_snapshot_version_number(metadata_name,
consistent_snapshot)
if metadata_name.endswith(METADATA_EXTENSION):
@ -2739,10 +2746,10 @@ def load_repository(repository_directory):
else:
continue
# Keep a store metadata previously loaded metadata to prevent
# re-loading duplicate versions. Duplicate versions may occur with
# consistent_snapshot, where the same metadata may be available in
# 'consistent_snapshot', where the same metadata may be available in
# multiples files (the different hash is included in each filename.
if metadata_name in loaded_metadata:
continue