python-tuf/examples/repository/repo
pakagronglb acd7ed08d1 Update Python shebangs to explicitly use python3
Signed-off-by: pakagronglb <pakagronglebel@gmail.com>
2025-02-19 19:44:21 +07:00

142 lines
4.4 KiB
Python
Executable file

#!/usr/bin/env python3
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Simple repository example application
The application stores metadata and targets in memory, and serves them via http.
Nothing is persisted on disk or loaded from disk. The application simulates a
live repository by adding new target files periodically.
"""
import argparse
import logging
import sys
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from time import time
from typing import Dict, List
from _simplerepo import SimpleRepository
from tuf.api.serialization.json import JSONSerializer
logger = logging.getLogger(__name__)
class ReqHandler(BaseHTTPRequestHandler):
"""HTTP handler for the repository example application
Serves metadata, targets and a small upload API using a SimpleRepository
"""
def do_POST(self):
"""Handle POST requests, aka the 'uploader API'"""
content_len = int(self.headers.get("content-length", 0))
data = self.rfile.read(content_len)
if self.path.startswith("/api/delegation/"):
role = self.path[len("/api/delegation/") :]
if not self.server.repo.submit_delegation(role, data):
return self.send_error(400, f"Failed to delegate to {role}")
elif self.path.startswith("/api/role/"):
role = self.path[len("/api/role/") :]
if not self.server.repo.submit_role(role, data):
return self.send_error(400, f"Failed to submit role {role}")
else:
return self.send_error(404)
self.send_response(200)
self.end_headers()
def do_GET(self):
"""Handle GET: metadata and target files"""
data = None
if self.path.startswith("/metadata/") and self.path.endswith(".json"):
data = self.get_metadata(
self.path[len("/metadata/") : -len(".json")]
)
elif self.path.startswith("/targets/"):
data = self.get_target(self.path[len("/targets/") :])
if data is None:
self.send_error(404)
else:
self.send_response(200)
self.send_header("Content-length", len(data))
self.end_headers()
self.wfile.write(data)
def get_metadata(self, ver_and_role: str):
repo = self.server.repo
ver_str, sep, role = ver_and_role.rpartition(".")
if sep == "":
# 0 will lead to list lookup with -1, meaning latest version
ver = 0
else:
ver = int(ver_str)
if role not in repo.role_cache or ver > len(repo.role_cache[role]):
return None
# return metadata
return repo.role_cache[role][ver - 1].to_bytes(JSONSerializer())
def get_target(self, targetpath: str):
repo = self.server.repo
# unimplement the dumb hashing scheme
# TODO: maybe use hashed paths as the target_cache key
dir, sep, hashname = targetpath.rpartition("/")
_, _, name = hashname.partition(".")
target = f"{dir}{sep}{name}"
if target not in repo.target_cache:
return None
# send the target content
return repo.target_cache[target]
class RepositoryServer(ThreadingHTTPServer):
def __init__(self, port: int):
super().__init__(("127.0.0.1", port), ReqHandler)
self.timeout = 1
self.repo = SimpleRepository()
def main(argv: List[str]) -> None:
"""Example repository server"""
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="count")
parser.add_argument("-p", "--port", type=int, default=8001)
args, _ = parser.parse_known_args(argv)
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level)
server = RepositoryServer(args.port)
last_change = 0
counter = 0
logger.info(
f"Now serving. Root v1 at http://127.0.0.1:{server.server_port}/metadata/1.root.json"
)
while True:
# Simulate a live repository: Add a new target file every few seconds
if time() - last_change > 10:
last_change = int(time())
counter += 1
content = str(datetime.fromtimestamp(last_change))
server.repo.add_target(f"file{str(counter)}.txt", content)
server.handle_request()
if __name__ == "__main__":
main(sys.argv)