mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
examples: Add further scaffolding for upload API
The API doesn't modify the repository yet but the data flow is there now. Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
This commit is contained in:
parent
9ec8459379
commit
efcb3cfb80
2 changed files with 40 additions and 6 deletions
|
|
@ -4,6 +4,7 @@
|
|||
"""Simple example of using the repository library to build a repository"""
|
||||
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
|
|
@ -131,3 +132,33 @@ def add_target(self, path: str, content: str) -> None:
|
|||
# update snapshot, timestamp
|
||||
self.snapshot()
|
||||
self.timestamp()
|
||||
|
||||
def submit_delegation(self, role: str, data: bytes) -> bool:
|
||||
try:
|
||||
logger.debug(f"Handling new delegation for role {role}")
|
||||
keyid, keydict = next(iter(json.loads(data).items()))
|
||||
key = Key.from_dict(keyid, keydict)
|
||||
|
||||
# TODO add delegation and key
|
||||
raise NotImplementedError
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def submit_role(self, role: str, data: bytes) -> bool:
|
||||
try:
|
||||
logger.debug(f"Handling new version for role {role}")
|
||||
md = Metadata.from_bytes(data)
|
||||
|
||||
# TODO add new metadata version
|
||||
raise NotImplementedError
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
|
|
|||
|
|
@ -32,18 +32,21 @@ class ReqHandler(BaseHTTPRequestHandler):
|
|||
"""Handle POST requests, aka the 'API'"""
|
||||
|
||||
content_len = int(self.headers.get('content-length', 0))
|
||||
data = self.rfile.read(content_len)
|
||||
|
||||
if self.path.startswith("/api/delegation/"):
|
||||
role = self.path[len("/api/delegation/"):]
|
||||
if role:
|
||||
raise NotImplementedError
|
||||
if not self.server.repo.submit_delegation(role, data):
|
||||
return self.send_error(400, f"Failed to delegate to {role}")
|
||||
elif self.path.startswith("/api/role/"):
|
||||
role = self.path[len("/api/role/"):]
|
||||
if role:
|
||||
raise NotImplementedError
|
||||
|
||||
self.send_error(404)
|
||||
if not self.server.repo.submit_role(role, data):
|
||||
return self.send_error(400, f"Failed to submit role {role}")
|
||||
else:
|
||||
return self.send_error(404)
|
||||
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
|
||||
def do_GET(self):
|
||||
"""Handle GET: metadata and target files"""
|
||||
|
|
|
|||
Loading…
Reference in a new issue