mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
tests: Remove sleeps from indefinite freeze tests
Instead of sleeping, mock time.time() so Updater thinks it lives in the future. Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
This commit is contained in:
parent
e7ce873f12
commit
ca048a2744
1 changed files with 53 additions and 77 deletions
|
|
@ -44,6 +44,8 @@
|
|||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import datetime
|
||||
import unittest.mock as mock
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
|
|
@ -265,8 +267,7 @@ def test_with_tuf(self):
|
|||
# Load the repository
|
||||
repository = repo_tool.load_repository(self.repository_directory)
|
||||
|
||||
# Load the timestamp and snapshot keys, since we will be signing a new
|
||||
# timestamp and a new snapshot file.
|
||||
# Load the snapshot and timestamp keys
|
||||
key_file = os.path.join(self.keystore_directory, 'timestamp_key')
|
||||
timestamp_private = repo_tool.import_ed25519_privatekey_from_file(key_file,
|
||||
'password')
|
||||
|
|
@ -276,17 +277,11 @@ def test_with_tuf(self):
|
|||
'password')
|
||||
repository.snapshot.load_signing_key(snapshot_private)
|
||||
|
||||
# Expire snapshot in 10s. This should be far enough into the future that we
|
||||
# haven't reached it before the first refresh validates timestamp expiry.
|
||||
# We want a successful refresh before expiry, then a second refresh after
|
||||
# expiry (which we then expect to raise an exception due to expired
|
||||
# metadata).
|
||||
expiry_time = time.time() + 10
|
||||
datetime_object = tuf.formats.unix_timestamp_to_datetime(int(expiry_time))
|
||||
|
||||
repository.snapshot.expiration = datetime_object
|
||||
|
||||
# Now write to the repository.
|
||||
# sign snapshot with expiry in near future (earlier than e.g. timestamp)
|
||||
expiry = int(time.time() + 60*60)
|
||||
repository.snapshot.expiration = tuf.formats.unix_timestamp_to_datetime(
|
||||
expiry)
|
||||
repository.mark_dirty(['snapshot', 'timestamp'])
|
||||
repository.writeall()
|
||||
|
||||
# And move the staged metadata to the "live" metadata.
|
||||
|
|
@ -297,30 +292,24 @@ def test_with_tuf(self):
|
|||
# Refresh metadata on the client. For this refresh, all data is not expired.
|
||||
logger.info('Test: Refreshing #1 - Initial metadata refresh occurring.')
|
||||
self.repository_updater.refresh()
|
||||
logger.info('Test: Refreshed #1 - Initial metadata refresh completed '
|
||||
'successfully. Now sleeping until snapshot metadata expires.')
|
||||
|
||||
# Sleep until expiry_time ('repository.snapshot.expiration')
|
||||
time.sleep(max(0, expiry_time - time.time() + 1))
|
||||
logger.info('Test: Refreshing #2 - refresh after local snapshot expiry.')
|
||||
|
||||
logger.info('Test: Refreshing #2 - Now trying to refresh again after local'
|
||||
' snapshot expiry.')
|
||||
# mock current time to one second after snapshot expiry
|
||||
mock_time = mock.Mock()
|
||||
mock_time.return_value = expiry + 1
|
||||
with mock.patch('time.time', mock_time):
|
||||
try:
|
||||
self.repository_updater.refresh() # We expect this to fail!
|
||||
|
||||
try:
|
||||
self.repository_updater.refresh() # We expect this to fail!
|
||||
except tuf.exceptions.ExpiredMetadataError:
|
||||
logger.info('Test: Refresh #2 - failed as expected. Expired local'
|
||||
' snapshot case generated a tuf.exceptions.ExpiredMetadataError'
|
||||
' exception as expected. Test pass.')
|
||||
|
||||
except tuf.exceptions.ExpiredMetadataError:
|
||||
logger.info('Test: Refresh #2 - failed as expected. Expired local'
|
||||
' snapshot case generated a tuf.exceptions.ExpiredMetadataError'
|
||||
' exception as expected. Test pass.')
|
||||
|
||||
# I think that I only expect tuf.ExpiredMetadata error here. A
|
||||
# NoWorkingMirrorError indicates something else in this case - unavailable
|
||||
# repo, for example.
|
||||
else:
|
||||
|
||||
self.fail('TUF failed to detect expired stale snapshot metadata. Freeze'
|
||||
' attack successful.')
|
||||
else:
|
||||
self.fail('TUF failed to detect expired stale snapshot metadata. Freeze'
|
||||
' attack successful.')
|
||||
|
||||
|
||||
|
||||
|
|
@ -355,7 +344,7 @@ def test_with_tuf(self):
|
|||
# We cannot set the timestamp expiration with
|
||||
# 'repository.timestamp.expiration = ...' with already-expired timestamp
|
||||
# metadata because of consistency checks that occur during that assignment.
|
||||
expiry_time = time.time() + 1
|
||||
expiry_time = time.time() + 60*60
|
||||
datetime_object = tuf.formats.unix_timestamp_to_datetime(int(expiry_time))
|
||||
repository.timestamp.expiration = datetime_object
|
||||
repository.writeall()
|
||||
|
|
@ -365,29 +354,21 @@ def test_with_tuf(self):
|
|||
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
|
||||
os.path.join(self.repository_directory, 'metadata'))
|
||||
|
||||
# Wait just long enough for the timestamp metadata (which is now both on
|
||||
# the repository and on the client) to expire.
|
||||
time.sleep(max(0, expiry_time - time.time() + 1))
|
||||
# mock current time to one second after timestamp expiry
|
||||
mock_time = mock.Mock()
|
||||
mock_time.return_value = expiry_time + 1
|
||||
with mock.patch('time.time', mock_time):
|
||||
try:
|
||||
self.repository_updater.refresh() # We expect NoWorkingMirrorError.
|
||||
|
||||
# Try to refresh top-level metadata on the client. Since we're already past
|
||||
# 'repository.timestamp.expiration', the TUF client is expected to detect
|
||||
# that timestamp metadata is outdated and refuse to continue the update
|
||||
# process.
|
||||
try:
|
||||
self.repository_updater.refresh() # We expect NoWorkingMirrorError.
|
||||
except tuf.exceptions.NoWorkingMirrorError as e:
|
||||
# Make sure the contained error is ExpiredMetadataError
|
||||
for mirror_url, mirror_error in six.iteritems(e.mirror_errors):
|
||||
self.assertTrue(isinstance(mirror_error, tuf.exceptions.ExpiredMetadataError))
|
||||
|
||||
except tuf.exceptions.NoWorkingMirrorError as e:
|
||||
# NoWorkingMirrorError indicates that we did not find valid, unexpired
|
||||
# metadata at any mirror. That exception class preserves the errors from
|
||||
# each mirror. We now assert that for each mirror, the particular error
|
||||
# detected was that metadata was expired (the timestamp we manually
|
||||
# expired).
|
||||
for mirror_url, mirror_error in six.iteritems(e.mirror_errors):
|
||||
self.assertTrue(isinstance(mirror_error, tuf.exceptions.ExpiredMetadataError))
|
||||
|
||||
else:
|
||||
self.fail('TUF failed to detect expired, stale timestamp metadata.'
|
||||
' Freeze attack successful.')
|
||||
else:
|
||||
self.fail('TUF failed to detect expired, stale timestamp metadata.'
|
||||
' Freeze attack successful.')
|
||||
|
||||
|
||||
|
||||
|
|
@ -416,8 +397,8 @@ def test_with_tuf(self):
|
|||
# Set ts to expire in 1 month.
|
||||
ts_expiry_time = time.time() + 2630000
|
||||
|
||||
# Set snapshot to expire in 1 second.
|
||||
snapshot_expiry_time = time.time() + 1
|
||||
# Set snapshot to expire in 1 hour.
|
||||
snapshot_expiry_time = time.time() + 60*60
|
||||
|
||||
ts_datetime_object = tuf.formats.unix_timestamp_to_datetime(
|
||||
int(ts_expiry_time))
|
||||
|
|
@ -432,28 +413,23 @@ def test_with_tuf(self):
|
|||
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
|
||||
os.path.join(self.repository_directory, 'metadata'))
|
||||
|
||||
# Wait just long enough for the Snapshot metadata (which is now on the
|
||||
# repository) to expire.
|
||||
time.sleep(max(0, snapshot_expiry_time - time.time() + 1))
|
||||
# mock current time to one second after snapshot expiry
|
||||
mock_time = mock.Mock()
|
||||
mock_time.return_value = snapshot_expiry_time + 1
|
||||
with mock.patch('time.time', mock_time):
|
||||
try:
|
||||
# We expect the following refresh() to raise a NoWorkingMirrorError.
|
||||
self.repository_updater.refresh()
|
||||
|
||||
except tuf.exceptions.NoWorkingMirrorError as e:
|
||||
# Make sure the contained error is ExpiredMetadataError
|
||||
for mirror_url, mirror_error in six.iteritems(e.mirror_errors):
|
||||
self.assertTrue(isinstance(mirror_error, tuf.exceptions.ExpiredMetadataError))
|
||||
self.assertTrue(mirror_url.endswith('snapshot.json'))
|
||||
|
||||
try:
|
||||
# We expect the following refresh() to raise a NoWorkingMirrorError.
|
||||
self.repository_updater.refresh()
|
||||
|
||||
except tuf.exceptions.NoWorkingMirrorError as e:
|
||||
# NoWorkingMirrorError indicates that we did not find valid, unexpired
|
||||
# metadata at any mirror. That exception class preserves the errors from
|
||||
# each mirror. We now assert that for each mirror, the particular error
|
||||
# detected was that metadata was expired (the Snapshot we manually
|
||||
# expired).
|
||||
for mirror_url, mirror_error in six.iteritems(e.mirror_errors):
|
||||
self.assertTrue(isinstance(mirror_error, tuf.exceptions.ExpiredMetadataError))
|
||||
self.assertTrue(mirror_url.endswith('snapshot.json'))
|
||||
|
||||
else:
|
||||
self.fail('TUF failed to detect expired, stale Snapshot metadata.'
|
||||
' Freeze attack successful.')
|
||||
else:
|
||||
self.fail('TUF failed to detect expired, stale Snapshot metadata.'
|
||||
' Freeze attack successful.')
|
||||
|
||||
# The client should have rejected the malicious Snapshot metadata, and
|
||||
# distrusted the local snapshot file that is no longer valid.
|
||||
|
|
|
|||
Loading…
Reference in a new issue