Improve delegated roles loading in load_repository()

Replace the list used for the delegations graph traversal with
a deque and use a set to store already loaded roles and avoid
loops in case of cycles in the graph.
Improve comments and readability.

Signed-off-by: Teodora Sechkova <tsechkova@vmware.com>
This commit is contained in:
Teodora Sechkova 2020-06-24 13:34:05 +03:00
parent da09a22861
commit f1a6676084
No known key found for this signature in database
GPG key ID: 65F78F613EA1914E

View file

@ -39,6 +39,8 @@
import shutil
import json
from collections import deque
import tuf
import tuf.formats
import tuf.roledb
@ -3100,29 +3102,50 @@ def load_repository(repository_directory, repository_name='default',
repository, consistent_snapshot = repo_lib._load_top_level_metadata(repository,
filenames, repository_name)
# Load the delegated targets metadata and generate their fileinfo. The
# extracted fileinfo is stored in the 'meta' field of the snapshot metadata
# object.
delegated_roles_filenames = repo_lib.get_delegations_filenames(
metadata_directory, consistent_snapshot, storage_backend)
# Load the delegated targets metadata and their fileinfo.
# The delegated targets roles form a tree/graph which is traversed in a
# breadth-first-search manner starting from 'targets' in order to correctly
# load the delegations hierarchy.
targets_objects = {}
targets_objects['targets'] = repository.targets
# A list of delegated-delegating role pairs
delegations = []
delegations_filenames = repo_lib.get_delegations_filenames(metadata_directory,
consistent_snapshot, storage_backend)
# Keep the next delegations to be loaded in a deque structure which
# has the properties of a list but is designed to have fast appends
# and pops from both ends
delegations = deque()
# A set used to keep the already loaded delegations and avoid an infinite
# loop in case of cycles in the delegations graph
loaded_delegations = set()
# Top-level roles are already loaded, fetch targets and get its delegations.
# Collect a list of delegated-delegating role pairs, starting from the
# top-level targets: [('role1', 'targets'), ('role2', 'targets'), ... ]
# Store the delegations in the form of delegated-delegating role tuples,
# starting from the top-level targets:
# [('role1', 'targets'), ('role2', 'targets'), ... ]
roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
for role in roleinfo['delegations']['roles']:
delegations.append([role['name'], 'targets'])
delegations.append((role['name'], 'targets'))
# Load the delegated roles by starting from 'targets' and continuously
# appending the next level delegations to the list
for rolename, delegating_role in delegations:
metadata_path = delegations_filenames[rolename]
# Traverse the graph by appending the next delegation to the deque and
# 'pop'-ing and loading the left-most element.
while delegations:
rolename, delegating_role = delegations.popleft()
if (rolename, delegating_role) in loaded_delegations:
logger.warning('Detected cycle in the delegation graph: ' +
repr(delegating_role) + ' -> ' +
repr(rolename) +
' is reached more than once.')
continue
# Instead of adding only rolename to the set, store the already loaded
# delegated-delegating role tuples. This way a delegated role is added
# to each of its delegating roles but when the role is reached twice
# from the same delegating role an infinite loop is avoided.
loaded_delegations.add((rolename, delegating_role))
metadata_path = delegated_roles_filenames[rolename]
signable = None
try:
@ -3149,8 +3172,6 @@ def load_repository(repository_directory, repository_name='default',
roleinfo['paths'] = metadata_object['targets']
roleinfo['delegations'] = metadata_object['delegations']
tuf.roledb.add_role(rolename, roleinfo, repository_name)
# Generate the Targets object of the delegated role,
# add it to the top-level 'targets' object and to its
# direct delegating role object.
@ -3165,10 +3186,10 @@ def load_repository(repository_directory, repository_name='default',
targets_objects[delegating_role].add_delegated_role(rolename,
new_targets_object)
# Append the next level delegations to the list:
# Append the next level delegations to the deque:
# the 'delegated' role becomes the 'delegating'
for delegation in metadata_object['delegations']['roles']:
delegations.append([delegation['name'], rolename])
delegations.append((delegation['name'], rolename))
# Extract the keys specified in the delegations field of the Targets
# role. Add 'key_object' to the list of recognized keys. Keys may be