From dc59cc0eeb808a1102cd8991c63cfd7f36d69bd2 Mon Sep 17 00:00:00 2001 From: Kon Date: Sat, 9 Feb 2013 02:04:58 -0500 Subject: [PATCH] Modified test_util.py [in progress], util.py and test_download.py --- src/tuf/tests/test_download.py | 1 - src/tuf/tests/test_util.py | 138 +++++++++++++++++++++++++++++---- src/tuf/util.py | 42 +++++----- 3 files changed, 145 insertions(+), 36 deletions(-) diff --git a/src/tuf/tests/test_download.py b/src/tuf/tests/test_download.py index 57639b29..d7770b55 100755 --- a/src/tuf/tests/test_download.py +++ b/src/tuf/tests/test_download.py @@ -71,7 +71,6 @@ def setUp(self): m = hashlib.md5() m.update(self.target_data) digest = m.hexdigest() - print digest self.target_hash = {'md5':digest} diff --git a/src/tuf/tests/test_util.py b/src/tuf/tests/test_util.py index 341c2f96..328d47cb 100644 --- a/src/tuf/tests/test_util.py +++ b/src/tuf/tests/test_util.py @@ -17,8 +17,11 @@ """ import os +import sys import gzip import shutil +import logging +import tuf.hash import tempfile import unittest import unittest_toolbox @@ -26,21 +29,37 @@ import tuf import tuf.util as util +# Disable/Enable logging. Uncomment to Disable. +logging.getLogger('tuf') +logging.disable(logging.CRITICAL) class TestUtil(unittest_toolbox.Modified_TestCase): def setUp(self): unittest_toolbox.Modified_TestCase.setUp(self) + util.tempfile.TemporaryFile = tempfile.TemporaryFile self.temp_fileobj = util.TempFile() + def tearDown(self): unittest_toolbox.Modified_TestCase.tearDown(self) self.temp_fileobj.close_temp_file() - def _verify_temp_dir(self, config_temp_dir=None): + # TODO + def testUtil_A1_close_temp_file(self): + pass + + + + def _extract_tempfile_directory(self, config_temp_dir=None): + """[Helper] Takes a directory (essentially specified in the config.py as + 'temporary_directory') and substitutes tempfile.TemporaryFile() with + tempfile.mkstemp() in order to extract actual directory of the stored + tempfile. Returns the config's temporary directory (or default temp + directory) and actual directory.""" # Patching 'tuf.conf.temporary_directory'. util.tuf.conf.temporary_directory = config_temp_dir @@ -51,8 +70,10 @@ def _verify_temp_dir(self, config_temp_dir=None): # Patching 'tempfile.TemporaryFile()' (by substituting # temfile.TemporaryFile() with tempfile.mkstemp()) in order to get the # directory of the stored tempfile object. + saved_tempfile_TemporaryFile = util.tempfile.TemporaryFile util.tempfile.TemporaryFile = tempfile.mkstemp _temp_fileobj = util.TempFile() + util.tempfile.TemporaryFile = saved_tempfile_TemporaryFile junk, _tempfilepath = _temp_fileobj.temporary_file _tempfile_dir = os.path.dirname(_tempfilepath) @@ -64,45 +85,51 @@ def _verify_temp_dir(self, config_temp_dir=None): return config_temp_dir, _tempfile_dir + - def test_init(self): - """TempFile initialization""" - # Goal: To verify temporary fileobj store directories. + def testUtil_A2_Init(self): + # Goal: Verify that tempfile is stored in an appropriate temp directory. # Test: Expected input verification. config_temp_dirs = [None, self.make_temp_directory()] for config_temp_dir in config_temp_dirs: - config_temp_dir, actual_dir = self._verify_temp_dir(config_temp_dir) + config_temp_dir, actual_dir = \ + self._extract_tempfile_directory(config_temp_dir) self.assertEquals(config_temp_dir, actual_dir) # Test: Unexpected input handling. config_temp_dirs = [self.random_string(), 123, ['a'], {'a':1}] for config_temp_dir in config_temp_dirs: - config_temp_dir, actual_dir = self._verify_temp_dir(config_temp_dir) + config_temp_dir, actual_dir = \ + self._extract_tempfile_directory(config_temp_dir) self.assertEquals(tempfile.gettempdir(), actual_dir) + + # TODO def testTempFile_read(self): pass + + # TODO def testTempFile_write(self): pass - def testTempFile_move(self): - """Expected behaviour of 'move' method""" + + def testUtil_A3_TempFile_Move(self): # Destination directory to save the temporary file in. dest_temp_dir = self.make_temp_directory() dest_path = os.path.join(dest_temp_dir, self.random_string()) self.temp_fileobj.write(self.random_string()) - self.temp_fileobj.move(dest_path) self.assertTrue(dest_path) + def _compress_existing_file(self, filepath): - """Compresses file 'filepath' and returns file path of + """[Helper]Compresses file 'filepath' and returns file path of the compresses file.""" # NOTE: DO NOT forget to remove the newly created compressed file! if os.path.exists(filepath): @@ -118,7 +145,9 @@ def _compress_existing_file(self, filepath): sys.exit(1) + def _decompress_file(self, compressed_filepath): + """[Helper]""" if os.path.exists(compressed_filepath): f = gzip.open(compressed_filepath, 'rb') file_content = f.read() @@ -130,8 +159,8 @@ def _decompress_file(self, compressed_filepath): sys.exit(1) - def testTempFile_decompress_temp_file_object_1(self): - """Expected behaviour of 'decompress_temp_file_object' method""" + + def testUtil_A4_TempFile_DecompressTempFileObject(self): # Setup: generate a temp file (self.make_temp_data_file()), # compress it. Write it to self.temp_fileobj(). filepath = self.make_temp_data_file() @@ -147,8 +176,7 @@ def testTempFile_decompress_temp_file_object_1(self): for arg in bogus_args: self.assertRaises(tuf.Error, self.temp_fileobj.decompress_temp_file_object, arg) - - self.temp_fileobj.decompress_temp_file_object('gzip') # Decompress! + self.temp_fileobj.decompress_temp_file_object('gzip') self.assertEquals(self.temp_fileobj.read(), fileobj.read()) # Checking the content of the TempFile's '_orig_file' instance. @@ -163,17 +191,93 @@ def testTempFile_decompress_temp_file_object_1(self): self.temp_fileobj.decompress_temp_file_object,'gzip') - def testUtil_get_file_details_1(self): - pass + + def testUtil_B1_GetFileDetails(self): + # Goal: Verify proper output given certain expected/unexpected input. + + # Making a temporary file. + filepath = self.make_temp_data_file() + + # Computing the hash and length of the tempfile. + digest_object = tuf.hash.digest_filename(filepath, algorithm='sha256') + file_hash = {'sha256' : digest_object.hexdigest()} + file_length = os.path.getsize(filepath) + + # Test: Expected input. + self.assertEquals(util.get_file_details(filepath), (file_length, file_hash)) + + # Test: Incorrect input. + bogus_inputs = [self.random_string(), 1234, [self.random_string()], + {'a', 'a'}, None] + for bogus_input in bogus_inputs: + if isinstance(bogus_input, basestring): + self.assertRaises(tuf.Error, util.get_file_details, bogus_input) + else: + self.assertRaises(tuf.FormatError, util.get_file_details, bogus_input) + + + + def testUtil_B2_EnsureParentDir(self): + existing_parent_dir = self.make_temp_directory() + non_existing_parent_dir = os.path.join(existing_parent_dir, 'a', 'b') + + for parent_dir in [existing_parent_dir, non_existing_parent_dir, 12, [3]]: + if isinstance(parent_dir, basestring): + util.ensure_parent_dir(os.path.join(parent_dir, 'a.txt')) + self.assertTrue(os.path.isdir(parent_dir)) + else: + self.assertRaises(tuf.FormatError, util.ensure_parent_dir, parent_dir) + + + + def testUtil_B3_PathInConfinedPaths(self): + # Goal: Provide invalid input for 'test_path' and 'confined_paths'. + # Include inputs like: '[1, 2, "a"]' and such... + Errors = (tuf.FormatError, TypeError) + list_of_confined_paths = ['a', 12, {'a':'a'}, [1]] + list_of_paths = [12, ['a'], {'a':'a'}, 'a'] + for bogus_confined_paths in list_of_confined_paths: + for bogus_path in list_of_paths: + self.assertRaises(tuf.FormatError, util.path_in_confined_paths, + bogus_path, bogus_confined_paths) + + # Test: Inputs that evaluate to False. + for confined_paths in [['/a/b/c.txt', 'a/b/c'], ['/a/b/c/d/e/']]: + for path in ['/a/b/d.txt', 'a', 'a/b/c/d/']: + self.assertFalse(util.path_in_confined_paths(path, confined_paths)) + + # Test: Inputs that evaluate to True. + for confined_paths in [[''], ['/a/b/c.txt', '/a/', '/a/b/c/d/']]: + for path in ['a/b/d.txt', 'a/b/x', 'a/b', 'a/b/c/d/g']: + self.assertTrue(util.path_in_confined_paths(path, confined_paths)) + def testUtil_B4_ImportJson(self): + self.assertTrue('json' or 'simplejson' in sys.modules) + def testUtil_B5_LoadJsonString(self): + data = ['a', {'b': ['c', None, 30.3, 29]}] + json_string = util.json.dumps(data) + self.assertEquals(data, util.load_json_string(json_string)) + + + + def testUtil_B6_LoadJsonFile(self): + data = ['a', {'b': ['c', None, 30.3, 29]}] + filepath = self.make_temp_file() + fileobj = open(filepath, 'wb') + util.json.dump(data, fileobj) + fileobj.close() + self.assertEquals(data, util.load_json_file(filepath)) + Errors = (tuf.FormatError, tuf.Error) + for bogus_arg in ['a', 1, ['a'], {'a':'b'}]: + self.assertRaises(Errors, util.load_json_file, bogus_arg) + # Run unit test. - suite = unittest.TestLoader().loadTestsFromTestCase(TestUtil) unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/src/tuf/util.py b/src/tuf/util.py index a7b8fff9..dc30cc39 100755 --- a/src/tuf/util.py +++ b/src/tuf/util.py @@ -64,7 +64,7 @@ def _default_temporary_directory(self, prefix): raise tuf.Error(err) - def __init__(self, prefix='tmp'): + def __init__(self, prefix='tuf_temp_'): """ Initializes TempFile. @@ -298,7 +298,7 @@ def close_temp_file(self): # TODO: remove 'repository_directory'. Instead specify hash algorithm. -def get_file_details(file_path, repository_directory=None): +def get_file_details(file_path): """ To get file's length and hash information. The hash is computed using @@ -319,7 +319,7 @@ def get_file_details(file_path, repository_directory=None): """ # Making sure that the format of 'file_path' is a path string. # tuf.FormatError is raised on incorrect format. - tuf.formats.RELPATH_SCHEMA.check_match(file_path) + tuf.formats.PATH_SCHEMA.check_match(file_path) # Does the path exists? if not os.path.exists(file_path): @@ -349,6 +349,9 @@ def ensure_parent_dir(filename): To ensure existence of the parent directory of 'filename'. If the parent directory of 'name' does not exist, create it. + Ex: If 'filename' is '/a/b/c/d.txt', and only the directory '/a/b/' + exists, then directory '/a/b/c/d/' will be created. + filename: A path string. @@ -387,7 +390,7 @@ def path_in_confined_paths(test_path, confined_paths): A list or a tuple of path strings. - tuf.TypeError, if the arguments are improperly typed. + tuf.FormatError: On incorrect format of the input. Boolean. True, if path is either the empty string @@ -397,13 +400,8 @@ def path_in_confined_paths(test_path, confined_paths): # Do the arguments are the correct format? # Raise 'tuf.FormatError' if there is a mismatch. - tuf.formats.PATH_SCHEMA.check_match(test_path) - tuf.formats.PATHS_SCHEMA.check_match(confined_paths) - - if not isinstance(test_path, basestring): - raise TypeError('The test path must be a string') - if not isinstance(confined_paths, (list, tuple)): - raise TypeError('The confined paths must be a list or a tuple') + tuf.formats.RELPATH_SCHEMA.check_match(test_path) + tuf.formats.RELPATHS_SCHEMA.check_match(confined_paths) for pattern in confined_paths: # Ignore slashes at the beginning. @@ -416,7 +414,6 @@ def path_in_confined_paths(test_path, confined_paths): # Get the directory name (i.e., strip off the file_path+extension) directory_name = os.path.dirname(test_path) - if directory_name == os.path.dirname(pattern): return True @@ -497,14 +494,14 @@ def dump(*k, **v): raise ImportError('Could not import a working json module') - json = import_json() + def load_json_string(data): """ - To deserialize a JSON object from a string 'data'. + Deserialize a JSON object from a string 'data'. data: @@ -518,21 +515,30 @@ def load_json_string(data): return json.loads(data) -def load_json_file(filename): + +def load_json_file(filepath): """ - To deserialize a JSON object from a file containing the object. + Deserialize a JSON object from a file containing the object. data: - A JSON string. + Absolute path of JSON file. Deserialized object. For example a dictionary. """ - fp = open(filename) + # Making sure that the format of 'file_path' is a path string. + # tuf.FormatError is raised on incorrect format. + tuf.formats.PATH_SCHEMA.check_match(filepath) + + try: + fp = open(filepath) + except IOError, err: + raise tuf.Error(err) + try: return json.load(fp) finally: