mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Modified test_util.py [in progress], util.py and test_download.py
This commit is contained in:
parent
1d9f5e8682
commit
dc59cc0eeb
3 changed files with 145 additions and 36 deletions
|
|
@ -71,7 +71,6 @@ def setUp(self):
|
|||
m = hashlib.md5()
|
||||
m.update(self.target_data)
|
||||
digest = m.hexdigest()
|
||||
print digest
|
||||
self.target_hash = {'md5':digest}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -17,8 +17,11 @@
|
|||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import gzip
|
||||
import shutil
|
||||
import logging
|
||||
import tuf.hash
|
||||
import tempfile
|
||||
import unittest
|
||||
import unittest_toolbox
|
||||
|
|
@ -26,21 +29,37 @@
|
|||
import tuf
|
||||
import tuf.util as util
|
||||
|
||||
# Disable/Enable logging. Uncomment to Disable.
|
||||
logging.getLogger('tuf')
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
|
||||
class TestUtil(unittest_toolbox.Modified_TestCase):
|
||||
|
||||
def setUp(self):
|
||||
unittest_toolbox.Modified_TestCase.setUp(self)
|
||||
util.tempfile.TemporaryFile = tempfile.TemporaryFile
|
||||
self.temp_fileobj = util.TempFile()
|
||||
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
unittest_toolbox.Modified_TestCase.tearDown(self)
|
||||
self.temp_fileobj.close_temp_file()
|
||||
|
||||
|
||||
def _verify_temp_dir(self, config_temp_dir=None):
|
||||
# TODO
|
||||
def testUtil_A1_close_temp_file(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def _extract_tempfile_directory(self, config_temp_dir=None):
|
||||
"""[Helper] Takes a directory (essentially specified in the config.py as
|
||||
'temporary_directory') and substitutes tempfile.TemporaryFile() with
|
||||
tempfile.mkstemp() in order to extract actual directory of the stored
|
||||
tempfile. Returns the config's temporary directory (or default temp
|
||||
directory) and actual directory."""
|
||||
# Patching 'tuf.conf.temporary_directory'.
|
||||
util.tuf.conf.temporary_directory = config_temp_dir
|
||||
|
||||
|
|
@ -51,8 +70,10 @@ def _verify_temp_dir(self, config_temp_dir=None):
|
|||
# Patching 'tempfile.TemporaryFile()' (by substituting
|
||||
# temfile.TemporaryFile() with tempfile.mkstemp()) in order to get the
|
||||
# directory of the stored tempfile object.
|
||||
saved_tempfile_TemporaryFile = util.tempfile.TemporaryFile
|
||||
util.tempfile.TemporaryFile = tempfile.mkstemp
|
||||
_temp_fileobj = util.TempFile()
|
||||
util.tempfile.TemporaryFile = saved_tempfile_TemporaryFile
|
||||
junk, _tempfilepath = _temp_fileobj.temporary_file
|
||||
_tempfile_dir = os.path.dirname(_tempfilepath)
|
||||
|
||||
|
|
@ -64,45 +85,51 @@ def _verify_temp_dir(self, config_temp_dir=None):
|
|||
|
||||
return config_temp_dir, _tempfile_dir
|
||||
|
||||
|
||||
|
||||
def test_init(self):
|
||||
"""TempFile initialization"""
|
||||
# Goal: To verify temporary fileobj store directories.
|
||||
def testUtil_A2_Init(self):
|
||||
# Goal: Verify that tempfile is stored in an appropriate temp directory.
|
||||
|
||||
# Test: Expected input verification.
|
||||
config_temp_dirs = [None, self.make_temp_directory()]
|
||||
for config_temp_dir in config_temp_dirs:
|
||||
config_temp_dir, actual_dir = self._verify_temp_dir(config_temp_dir)
|
||||
config_temp_dir, actual_dir = \
|
||||
self._extract_tempfile_directory(config_temp_dir)
|
||||
self.assertEquals(config_temp_dir, actual_dir)
|
||||
|
||||
# Test: Unexpected input handling.
|
||||
config_temp_dirs = [self.random_string(), 123, ['a'], {'a':1}]
|
||||
for config_temp_dir in config_temp_dirs:
|
||||
config_temp_dir, actual_dir = self._verify_temp_dir(config_temp_dir)
|
||||
config_temp_dir, actual_dir = \
|
||||
self._extract_tempfile_directory(config_temp_dir)
|
||||
self.assertEquals(tempfile.gettempdir(), actual_dir)
|
||||
|
||||
|
||||
|
||||
# TODO
|
||||
def testTempFile_read(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
# TODO
|
||||
def testTempFile_write(self):
|
||||
pass
|
||||
|
||||
|
||||
def testTempFile_move(self):
|
||||
"""Expected behaviour of 'move' method"""
|
||||
|
||||
def testUtil_A3_TempFile_Move(self):
|
||||
# Destination directory to save the temporary file in.
|
||||
dest_temp_dir = self.make_temp_directory()
|
||||
dest_path = os.path.join(dest_temp_dir, self.random_string())
|
||||
self.temp_fileobj.write(self.random_string())
|
||||
|
||||
self.temp_fileobj.move(dest_path)
|
||||
self.assertTrue(dest_path)
|
||||
|
||||
|
||||
|
||||
def _compress_existing_file(self, filepath):
|
||||
"""Compresses file 'filepath' and returns file path of
|
||||
"""[Helper]Compresses file 'filepath' and returns file path of
|
||||
the compresses file."""
|
||||
# NOTE: DO NOT forget to remove the newly created compressed file!
|
||||
if os.path.exists(filepath):
|
||||
|
|
@ -118,7 +145,9 @@ def _compress_existing_file(self, filepath):
|
|||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
def _decompress_file(self, compressed_filepath):
|
||||
"""[Helper]"""
|
||||
if os.path.exists(compressed_filepath):
|
||||
f = gzip.open(compressed_filepath, 'rb')
|
||||
file_content = f.read()
|
||||
|
|
@ -130,8 +159,8 @@ def _decompress_file(self, compressed_filepath):
|
|||
sys.exit(1)
|
||||
|
||||
|
||||
def testTempFile_decompress_temp_file_object_1(self):
|
||||
"""Expected behaviour of 'decompress_temp_file_object' method"""
|
||||
|
||||
def testUtil_A4_TempFile_DecompressTempFileObject(self):
|
||||
# Setup: generate a temp file (self.make_temp_data_file()),
|
||||
# compress it. Write it to self.temp_fileobj().
|
||||
filepath = self.make_temp_data_file()
|
||||
|
|
@ -147,8 +176,7 @@ def testTempFile_decompress_temp_file_object_1(self):
|
|||
for arg in bogus_args:
|
||||
self.assertRaises(tuf.Error,
|
||||
self.temp_fileobj.decompress_temp_file_object, arg)
|
||||
|
||||
self.temp_fileobj.decompress_temp_file_object('gzip') # Decompress!
|
||||
self.temp_fileobj.decompress_temp_file_object('gzip')
|
||||
self.assertEquals(self.temp_fileobj.read(), fileobj.read())
|
||||
|
||||
# Checking the content of the TempFile's '_orig_file' instance.
|
||||
|
|
@ -163,17 +191,93 @@ def testTempFile_decompress_temp_file_object_1(self):
|
|||
self.temp_fileobj.decompress_temp_file_object,'gzip')
|
||||
|
||||
|
||||
def testUtil_get_file_details_1(self):
|
||||
pass
|
||||
|
||||
def testUtil_B1_GetFileDetails(self):
|
||||
# Goal: Verify proper output given certain expected/unexpected input.
|
||||
|
||||
# Making a temporary file.
|
||||
filepath = self.make_temp_data_file()
|
||||
|
||||
# Computing the hash and length of the tempfile.
|
||||
digest_object = tuf.hash.digest_filename(filepath, algorithm='sha256')
|
||||
file_hash = {'sha256' : digest_object.hexdigest()}
|
||||
file_length = os.path.getsize(filepath)
|
||||
|
||||
# Test: Expected input.
|
||||
self.assertEquals(util.get_file_details(filepath), (file_length, file_hash))
|
||||
|
||||
# Test: Incorrect input.
|
||||
bogus_inputs = [self.random_string(), 1234, [self.random_string()],
|
||||
{'a', 'a'}, None]
|
||||
for bogus_input in bogus_inputs:
|
||||
if isinstance(bogus_input, basestring):
|
||||
self.assertRaises(tuf.Error, util.get_file_details, bogus_input)
|
||||
else:
|
||||
self.assertRaises(tuf.FormatError, util.get_file_details, bogus_input)
|
||||
|
||||
|
||||
|
||||
def testUtil_B2_EnsureParentDir(self):
|
||||
existing_parent_dir = self.make_temp_directory()
|
||||
non_existing_parent_dir = os.path.join(existing_parent_dir, 'a', 'b')
|
||||
|
||||
for parent_dir in [existing_parent_dir, non_existing_parent_dir, 12, [3]]:
|
||||
if isinstance(parent_dir, basestring):
|
||||
util.ensure_parent_dir(os.path.join(parent_dir, 'a.txt'))
|
||||
self.assertTrue(os.path.isdir(parent_dir))
|
||||
else:
|
||||
self.assertRaises(tuf.FormatError, util.ensure_parent_dir, parent_dir)
|
||||
|
||||
|
||||
|
||||
def testUtil_B3_PathInConfinedPaths(self):
|
||||
# Goal: Provide invalid input for 'test_path' and 'confined_paths'.
|
||||
# Include inputs like: '[1, 2, "a"]' and such...
|
||||
Errors = (tuf.FormatError, TypeError)
|
||||
list_of_confined_paths = ['a', 12, {'a':'a'}, [1]]
|
||||
list_of_paths = [12, ['a'], {'a':'a'}, 'a']
|
||||
for bogus_confined_paths in list_of_confined_paths:
|
||||
for bogus_path in list_of_paths:
|
||||
self.assertRaises(tuf.FormatError, util.path_in_confined_paths,
|
||||
bogus_path, bogus_confined_paths)
|
||||
|
||||
# Test: Inputs that evaluate to False.
|
||||
for confined_paths in [['/a/b/c.txt', 'a/b/c'], ['/a/b/c/d/e/']]:
|
||||
for path in ['/a/b/d.txt', 'a', 'a/b/c/d/']:
|
||||
self.assertFalse(util.path_in_confined_paths(path, confined_paths))
|
||||
|
||||
# Test: Inputs that evaluate to True.
|
||||
for confined_paths in [[''], ['/a/b/c.txt', '/a/', '/a/b/c/d/']]:
|
||||
for path in ['a/b/d.txt', 'a/b/x', 'a/b', 'a/b/c/d/g']:
|
||||
self.assertTrue(util.path_in_confined_paths(path, confined_paths))
|
||||
|
||||
|
||||
|
||||
def testUtil_B4_ImportJson(self):
|
||||
self.assertTrue('json' or 'simplejson' in sys.modules)
|
||||
|
||||
|
||||
|
||||
def testUtil_B5_LoadJsonString(self):
|
||||
data = ['a', {'b': ['c', None, 30.3, 29]}]
|
||||
json_string = util.json.dumps(data)
|
||||
self.assertEquals(data, util.load_json_string(json_string))
|
||||
|
||||
|
||||
|
||||
def testUtil_B6_LoadJsonFile(self):
|
||||
data = ['a', {'b': ['c', None, 30.3, 29]}]
|
||||
filepath = self.make_temp_file()
|
||||
fileobj = open(filepath, 'wb')
|
||||
util.json.dump(data, fileobj)
|
||||
fileobj.close()
|
||||
self.assertEquals(data, util.load_json_file(filepath))
|
||||
Errors = (tuf.FormatError, tuf.Error)
|
||||
for bogus_arg in ['a', 1, ['a'], {'a':'b'}]:
|
||||
self.assertRaises(Errors, util.load_json_file, bogus_arg)
|
||||
|
||||
|
||||
|
||||
# Run unit test.
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestUtil)
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ def _default_temporary_directory(self, prefix):
|
|||
raise tuf.Error(err)
|
||||
|
||||
|
||||
def __init__(self, prefix='tmp'):
|
||||
def __init__(self, prefix='tuf_temp_'):
|
||||
"""
|
||||
<Purpose>
|
||||
Initializes TempFile.
|
||||
|
|
@ -298,7 +298,7 @@ def close_temp_file(self):
|
|||
|
||||
|
||||
# TODO: remove 'repository_directory'. Instead specify hash algorithm.
|
||||
def get_file_details(file_path, repository_directory=None):
|
||||
def get_file_details(file_path):
|
||||
"""
|
||||
<Purpose>
|
||||
To get file's length and hash information. The hash is computed using
|
||||
|
|
@ -319,7 +319,7 @@ def get_file_details(file_path, repository_directory=None):
|
|||
"""
|
||||
# Making sure that the format of 'file_path' is a path string.
|
||||
# tuf.FormatError is raised on incorrect format.
|
||||
tuf.formats.RELPATH_SCHEMA.check_match(file_path)
|
||||
tuf.formats.PATH_SCHEMA.check_match(file_path)
|
||||
|
||||
# Does the path exists?
|
||||
if not os.path.exists(file_path):
|
||||
|
|
@ -349,6 +349,9 @@ def ensure_parent_dir(filename):
|
|||
To ensure existence of the parent directory of 'filename'. If the parent
|
||||
directory of 'name' does not exist, create it.
|
||||
|
||||
Ex: If 'filename' is '/a/b/c/d.txt', and only the directory '/a/b/'
|
||||
exists, then directory '/a/b/c/d/' will be created.
|
||||
|
||||
<Arguments>
|
||||
filename:
|
||||
A path string.
|
||||
|
|
@ -387,7 +390,7 @@ def path_in_confined_paths(test_path, confined_paths):
|
|||
A list or a tuple of path strings.
|
||||
|
||||
<Exceptions>
|
||||
tuf.TypeError, if the arguments are improperly typed.
|
||||
tuf.FormatError: On incorrect format of the input.
|
||||
|
||||
<Return>
|
||||
Boolean. True, if path is either the empty string
|
||||
|
|
@ -397,13 +400,8 @@ def path_in_confined_paths(test_path, confined_paths):
|
|||
|
||||
# Do the arguments are the correct format?
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(test_path)
|
||||
tuf.formats.PATHS_SCHEMA.check_match(confined_paths)
|
||||
|
||||
if not isinstance(test_path, basestring):
|
||||
raise TypeError('The test path must be a string')
|
||||
if not isinstance(confined_paths, (list, tuple)):
|
||||
raise TypeError('The confined paths must be a list or a tuple')
|
||||
tuf.formats.RELPATH_SCHEMA.check_match(test_path)
|
||||
tuf.formats.RELPATHS_SCHEMA.check_match(confined_paths)
|
||||
|
||||
for pattern in confined_paths:
|
||||
# Ignore slashes at the beginning.
|
||||
|
|
@ -416,7 +414,6 @@ def path_in_confined_paths(test_path, confined_paths):
|
|||
|
||||
# Get the directory name (i.e., strip off the file_path+extension)
|
||||
directory_name = os.path.dirname(test_path)
|
||||
|
||||
if directory_name == os.path.dirname(pattern):
|
||||
return True
|
||||
|
||||
|
|
@ -497,14 +494,14 @@ def dump(*k, **v):
|
|||
|
||||
raise ImportError('Could not import a working json module')
|
||||
|
||||
|
||||
json = import_json()
|
||||
|
||||
|
||||
|
||||
def load_json_string(data):
|
||||
"""
|
||||
<Purpose>
|
||||
To deserialize a JSON object from a string 'data'.
|
||||
Deserialize a JSON object from a string 'data'.
|
||||
|
||||
<Arguments>
|
||||
data:
|
||||
|
|
@ -518,21 +515,30 @@ def load_json_string(data):
|
|||
return json.loads(data)
|
||||
|
||||
|
||||
def load_json_file(filename):
|
||||
|
||||
def load_json_file(filepath):
|
||||
"""
|
||||
<Purpose>
|
||||
To deserialize a JSON object from a file containing the object.
|
||||
Deserialize a JSON object from a file containing the object.
|
||||
|
||||
<Arguments>
|
||||
data:
|
||||
A JSON string.
|
||||
Absolute path of JSON file.
|
||||
|
||||
<Return>
|
||||
Deserialized object. For example a dictionary.
|
||||
|
||||
"""
|
||||
|
||||
fp = open(filename)
|
||||
# Making sure that the format of 'file_path' is a path string.
|
||||
# tuf.FormatError is raised on incorrect format.
|
||||
tuf.formats.PATH_SCHEMA.check_match(filepath)
|
||||
|
||||
try:
|
||||
fp = open(filepath)
|
||||
except IOError, err:
|
||||
raise tuf.Error(err)
|
||||
|
||||
try:
|
||||
return json.load(fp)
|
||||
finally:
|
||||
|
|
|
|||
Loading…
Reference in a new issue