From fd40dfc094ca33f00f153fd4ae44bf5f77441d5a Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 8 Oct 2021 10:30:31 +0300 Subject: [PATCH] tests: Refactor simulator signer handling Store signers with their keyids so they are easier to remove. The signers structure now looks like: { "role1": { "keyidA": SSlibSigner, "keyidB": SSlibSigner, } } Add convenience method for adding a signer. Signed-off-by: Jussi Kukkonen --- tests/repository_simulator.py | 24 +++++++++++++----------- tests/test_updater_with_simulator.py | 11 ++++++----- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 3565fbe0..c17e719d 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -98,7 +98,8 @@ def __init__(self): self.signed_roots: List[bytes] = [] # signers are used on-demand at fetch time to sign metadata - self.signers: Dict[str, List[SSlibSigner]] = {} + # keys are roles, values are dicts of {keyid: signer} + self.signers: Dict[str, Dict[str, SSlibSigner]] = {} # target downloads are served from this dict self.target_files: Dict[str, RepositoryTarget] = {} @@ -139,6 +140,11 @@ def create_key(self) -> Tuple[Key, SSlibSigner]: sslib_key = generate_ed25519_key() return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) + def add_signer(self, role:str, signer: SSlibSigner): + if role not in self.signers: + self.signers[role] = {} + self.signers[role][signer.key_dict["keyid"]] = signer + def _initialize(self): """Setup a minimal valid repository""" @@ -159,18 +165,16 @@ def _initialize(self): for role in TOP_LEVEL_ROLE_NAMES: key, signer = self.create_key() root.add_key(role, key) - # store the private key - if role not in self.signers: - self.signers[role] = [] - self.signers[role].append(signer) + self.add_signer(role, signer) + self.md_root = Metadata(root, OrderedDict()) self.publish_root() def publish_root(self): """Sign and store a new serialized version of root""" self.md_root.signatures.clear() - for signer in self.signers["root"]: - self.md_root.sign(signer) + for signer in self.signers["root"].values(): + self.md_root.sign(signer, append=True) self.signed_roots.append(self.md_root.to_bytes(JSONSerializer())) logger.debug("Published root v%d", self.root.version) @@ -242,7 +246,7 @@ def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: raise FetcherHTTPError(f"Unknown {role} version {version}", 404) md.signatures.clear() - for signer in self.signers[role]: + for signer in self.signers[role].values(): md.sign(signer, append=True) logger.debug( @@ -320,9 +324,7 @@ def add_delegation( # By default add one new key for the role key, signer = self.create_key() delegator.add_key(role.name, key) - if role.name not in self.signers: - self.signers[role.name] = [] - self.signers[role.name].append(signer) + self.add_signer(role.name, signer) # Add metadata for the role self.md_delegates[role.name] = Metadata(targets, OrderedDict()) diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index 8cfb9de7..65727aa8 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -156,10 +156,10 @@ def test_keys_and_signatures(self): # Update top level metadata self._run_refresh() - # New targets: signed with a new key that is not in roles keys - old_signer = self.sim.signers["targets"].pop() + # New targets: signed with only a new key that is not in roles keys + old_signers = self.sim.signers.pop("targets") key, signer = self.sim.create_key() - self.sim.signers["targets"] = [signer] + self.sim.add_signer("targets", signer) self.sim.targets.version += 1 self.sim.update_snapshot() @@ -182,8 +182,9 @@ def test_keys_and_signatures(self): with self.assertRaises(UnsignedMetadataError): self._run_refresh() - # New targets: sign with both new and old key - self.sim.signers["targets"] = [signer, old_signer] + # New targets: sign with both new and any original keys + for signer in old_signers.values(): + self.sim.add_signer("targets", signer) self.sim.targets.version += 1 self.sim.update_snapshot()