python-tuf/tests/test_slow_retrieval_attack.py
Sebastien Awwad dad9f9f11b
test: extend sleep in testing after spawning server processes
This adds to PR #799; please see that for details.  In short,
tests sometimes fail on slow test systems (primarily on AppVeyor)
if we don't have long enough delays.

Signed-off-by: Sebastien Awwad <sebastien.awwad@gmail.com>
2018-11-06 09:59:24 -05:00

327 lines
14 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2012 - 2017, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
test_slow_retrieval_attack.py
<Author>
Konstantin Andrianov.
<Started>
March 13, 2012.
April 5, 2014.
Refactored to use the 'unittest' module (test conditions in code, rather
than verifying text output), use pre-generated repository files, and
discontinue use of the old repository tools. Expanded comments and modified
previous setup. -vladimir.v.diaz
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
Simulate a slow retrieval attack, where an attacker is able to prevent clients
from receiving updates by responding to client requests so slowly that updates
never complete. Test cases included for two types of slow retrievals: data
that slowly trickles in, and data that is only returned after a long time
delay. TUF prevents slow retrieval attacks by ensuring the download rate
does not fall below a required rate (tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED).
Note: There is no difference between 'updates' and 'target' files.
# TODO: Consider additional tests for slow metadata download. Tests here only
use slow target download.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import sys
import tempfile
import random
import time
import shutil
import json
import subprocess
import logging
import sys
import unittest
import tuf.log
import tuf.client.updater as updater
import tuf.unittest_toolbox as unittest_toolbox
import tuf.repository_tool as repo_tool
import tuf.roledb
import tuf.keydb
import six
logger = logging.getLogger('tuf.test_slow_retrieval_attack')
repo_tool.disable_console_log_messages()
class TestSlowRetrievalAttack(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# setUpClass() is called before any of the test cases are executed.
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownModule() so that
# temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
cls.SERVER_PORT = random.randint(30000, 45000)
@classmethod
def tearDownClass(cls):
# tearDownModule() is called after all the test cases have run.
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
def _start_slow_server(self, mode):
# Launch a SimpleHTTPServer (serves files in the current directory).
# Test cases will request metadata and target files that have been
# pre-generated in 'tuf/tests/repository_data', which will be served by the
# SimpleHTTPServer launched here. The test cases of this unit test assume
# the pre-generated metadata files have a specific structure, such
# as a delegated role 'targets/role1', three target files, five key files,
# etc.
command = ['python', 'slow_retrieval_server.py', str(self.SERVER_PORT), mode]
server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
logger.info('Slow Retrieval Server process started.')
logger.info('Server process id: '+str(server_process.pid))
logger.info('Serving on port: '+str(self.SERVER_PORT))
url = 'http://localhost:'+str(self.SERVER_PORT) + os.path.sep
# NOTE: Following error is raised if a delay is not long enough:
# <urlopen error [Errno 111] Connection refused>
# or, on Windows:
# Failed to establish a new connection: [Errno 111] Connection refused'
# 1s led to occasional failures in automated builds on AppVeyor, so
# increasing this to 3s, sadly.
time.sleep(3)
return server_process
def _stop_slow_server(self, server_process):
# Kill the SimpleHTTPServer process.
if server_process.returncode is None:
logger.info('Server process '+str(server_process.pid)+' terminated.')
server_process.kill()
def setUp(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.setUp(self)
self.repository_name = 'test_repository1'
# Copy the original repository files provided in the test folder so that
# any modifications made to repository files are restricted to the copies.
# The 'repository_data' directory is expected to exist in 'tuf/tests/'.
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
temporary_repository_root = \
self.make_temp_directory(directory=self.temporary_directory)
# The original repository, keystore, and client directories will be copied
# for each test case.
original_repository = os.path.join(original_repository_files, 'repository')
original_client = os.path.join(original_repository_files, 'client')
original_keystore = os.path.join(original_repository_files, 'keystore')
# Save references to the often-needed client repository directories.
# Test cases need these references to access metadata and target files.
self.repository_directory = \
os.path.join(temporary_repository_root, 'repository')
self.client_directory = os.path.join(temporary_repository_root, 'client')
self.keystore_directory = os.path.join(temporary_repository_root, 'keystore')
# Copy the original 'repository', 'client', and 'keystore' directories
# to the temporary repository the test cases can use.
shutil.copytree(original_repository, self.repository_directory)
shutil.copytree(original_client, self.client_directory)
shutil.copytree(original_keystore, self.keystore_directory)
# Produce a longer target file than exists in the other test repository
# data, to provide for a long-duration slow attack. Then we'll write new
# top-level metadata that includes a hash over that file, and provide that
# metadata to the client as well.
# The slow retrieval server, in mode 2 (1 byte per second), will only
# sleep for a total of (target file size) seconds. Add a target file
# that contains sufficient number of bytes to trigger a slow retrieval
# error. A transfer should not be permitted to take 1 second per byte
# transferred. Because this test is currently expected to fail, I'm
# limiting the size to 10 bytes (10 seconds) to avoid expected testing
# delays.... Consider increasing again after fix, to, e.g. 400.
total_bytes = 10
repository = repo_tool.load_repository(self.repository_directory)
file1_filepath = os.path.join(self.repository_directory, 'targets',
'file1.txt')
with open(file1_filepath, 'wb') as file_object:
data = 'a' * int(round(total_bytes))
file_object.write(data.encode('utf-8'))
key_file = os.path.join(self.keystore_directory, 'timestamp_key')
timestamp_private = repo_tool.import_ed25519_privatekey_from_file(key_file,
'password')
key_file = os.path.join(self.keystore_directory, 'snapshot_key')
snapshot_private = repo_tool.import_ed25519_privatekey_from_file(key_file,
'password')
key_file = os.path.join(self.keystore_directory, 'targets_key')
targets_private = repo_tool.import_ed25519_privatekey_from_file(key_file,
'password')
repository.targets.load_signing_key(targets_private)
repository.snapshot.load_signing_key(snapshot_private)
repository.timestamp.load_signing_key(timestamp_private)
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Since we've changed the repository metadata in this setup (by lengthening
# a target file and then writing new metadata), we also have to update the
# client metadata to get to the expected initial state, where the client
# knows the right target info (and so expects the right, longer target
# length.
# We'll skip using updater.refresh since we don't have a server running,
# and we'll update the metadata locally, manually.
shutil.rmtree(os.path.join(
self.client_directory, self.repository_name, 'metadata', 'current'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata'),
os.path.join(self.client_directory, self.repository_name, 'metadata',
'current'))
# Set the url prefix required by the 'tuf/client/updater.py' updater.
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
repository_basepath = self.repository_directory[len(os.getcwd()):]
url_prefix = \
'http://localhost:' + str(self.SERVER_PORT) + repository_basepath
# Setting 'tuf.settings.repository_directory' with the temporary client
# directory copied from the original repository files.
tuf.settings.repositories_directory = self.client_directory
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}}
# Create the repository instance. The test cases will use this client
# updater to refresh metadata, fetch target files, etc.
self.repository_updater = updater.Updater(self.repository_name,
self.repository_mirrors)
def tearDown(self):
# Modified_TestCase.tearDown() automatically deletes temporary files and
# directories that may have been created during each test case.
unittest_toolbox.Modified_TestCase.tearDown(self)
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
def test_with_tuf_mode_1(self):
# Simulate a slow retrieval attack.
# 'mode_1': When download begins,the server blocks the download for a long
# time by doing nothing before it sends the first byte of data.
server_process = self._start_slow_server('mode_1')
# Verify that the TUF client detects replayed metadata and refuses to
# continue the update process.
client_filepath = os.path.join(self.client_directory, 'file1.txt')
try:
file1_target = self.repository_updater.get_one_valid_targetinfo('file1.txt')
self.repository_updater.download_target(file1_target, self.client_directory)
# Verify that the specific 'tuf.exceptions.SlowRetrievalError' exception is raised by
# each mirror.
except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# Verify that 'file1.txt' is the culprit.
self.assertEqual(url_file.replace('\\', '/'), mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.exceptions.SlowRetrievalError))
else:
self.fail('TUF did not prevent a slow retrieval attack.')
finally:
self._stop_slow_server(server_process)
# The following test fails as a result of a change to TUF's download code.
# Rather than constructing urllib2 requests, we now use the requests library.
# This solves an HTTPS proxy issue, but has for the moment deprived us of a
# way to prevent certain this kind of slow retrieval attack.
# See conversation in PR: https://github.com/theupdateframework/tuf/pull/781
# TODO: Update download code to resolve the slow retrieval vulnerability.
@unittest.expectedFailure
def test_with_tuf_mode_2(self):
# Simulate a slow retrieval attack.
# 'mode_2': During the download process, the server blocks the download
# by sending just several characters every few seconds.
server_process = self._start_slow_server('mode_2')
client_filepath = os.path.join(self.client_directory, 'file1.txt')
original_average_download_speed = tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED
tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED = 3
try:
file1_target = self.repository_updater.get_one_valid_targetinfo('file1.txt')
self.repository_updater.download_target(file1_target, self.client_directory)
# Verify that the specific 'tuf.exceptions.SlowRetrievalError' exception is
# raised by each mirror. 'file1.txt' should be large enough to trigger a
# slow retrieval attack, otherwise the expected exception may not be
# consistently raised.
except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# Verify that 'file1.txt' is the culprit.
self.assertEqual(url_file.replace('\\', '/'), mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.exceptions.SlowRetrievalError))
else:
# Another possibility is to check for a successfully downloaded
# 'file1.txt' at this point.
self.fail('TUF did not prevent a slow retrieval attack.')
finally:
self._stop_slow_server(server_process)
tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED = original_average_download_speed
if __name__ == '__main__':
unittest.main()