mirror of
https://github.com/ringhyacinth/Star-Office-UI
synced 2026-04-21 13:27:19 +00:00
2103 lines
79 KiB
Python
2103 lines
79 KiB
Python
#!/usr/bin/env python3
|
||
"""Star Office UI - Backend State Service"""
|
||
|
||
from flask import Flask, jsonify, send_from_directory, make_response, request, session
|
||
from datetime import datetime, timedelta
|
||
import json
|
||
import os
|
||
import random
|
||
import math
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import tempfile
|
||
import threading
|
||
from pathlib import Path
|
||
from security_utils import is_production_mode, is_strong_secret, is_strong_drawer_pass
|
||
from memo_utils import get_yesterday_date_str, sanitize_content, extract_memo_from_file
|
||
from store_utils import (
|
||
load_agents_state as _store_load_agents_state,
|
||
save_agents_state as _store_save_agents_state,
|
||
load_asset_positions as _store_load_asset_positions,
|
||
save_asset_positions as _store_save_asset_positions,
|
||
load_asset_defaults as _store_load_asset_defaults,
|
||
save_asset_defaults as _store_save_asset_defaults,
|
||
load_runtime_config as _store_load_runtime_config,
|
||
save_runtime_config as _store_save_runtime_config,
|
||
load_join_keys as _store_load_join_keys,
|
||
save_join_keys as _store_save_join_keys,
|
||
)
|
||
|
||
try:
|
||
from PIL import Image
|
||
except Exception:
|
||
Image = None
|
||
|
||
# Paths (project-relative, no hardcoded absolute paths)
|
||
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
MEMORY_DIR = os.path.join(os.path.dirname(ROOT_DIR), "memory")
|
||
FRONTEND_DIR = os.path.join(ROOT_DIR, "frontend")
|
||
FRONTEND_INDEX_FILE = os.path.join(FRONTEND_DIR, "index.html")
|
||
FRONTEND_ELECTRON_STANDALONE_FILE = os.path.join(FRONTEND_DIR, "electron-standalone.html")
|
||
STATE_FILE = os.path.join(ROOT_DIR, "state.json")
|
||
AGENTS_STATE_FILE = os.path.join(ROOT_DIR, "agents-state.json")
|
||
JOIN_KEYS_FILE = os.path.join(ROOT_DIR, "join-keys.json")
|
||
FRONTEND_PATH = Path(FRONTEND_DIR)
|
||
ASSET_ALLOWED_EXTS = {".png", ".webp", ".jpg", ".jpeg", ".gif", ".svg", ".avif"}
|
||
ASSET_TEMPLATE_ZIP = os.path.join(ROOT_DIR, "assets-replace-template.zip")
|
||
WORKSPACE_DIR = os.path.dirname(ROOT_DIR)
|
||
OPENCLAW_WORKSPACE = os.environ.get("OPENCLAW_WORKSPACE") or os.path.join(os.path.expanduser("~"), ".openclaw", "workspace")
|
||
IDENTITY_FILE = os.path.join(OPENCLAW_WORKSPACE, "IDENTITY.md")
|
||
GEMINI_SCRIPT = os.path.join(WORKSPACE_DIR, "skills", "gemini-image-generate", "scripts", "gemini_image_generate.py")
|
||
GEMINI_PYTHON = os.path.join(WORKSPACE_DIR, "skills", "gemini-image-generate", ".venv", "bin", "python")
|
||
ROOM_REFERENCE_IMAGE = (
|
||
os.path.join(ROOT_DIR, "assets", "room-reference.webp")
|
||
if os.path.exists(os.path.join(ROOT_DIR, "assets", "room-reference.webp"))
|
||
else os.path.join(ROOT_DIR, "assets", "room-reference.png")
|
||
)
|
||
BG_HISTORY_DIR = os.path.join(ROOT_DIR, "assets", "bg-history")
|
||
HOME_FAVORITES_DIR = os.path.join(ROOT_DIR, "assets", "home-favorites")
|
||
HOME_FAVORITES_INDEX_FILE = os.path.join(HOME_FAVORITES_DIR, "index.json")
|
||
HOME_FAVORITES_MAX = 30
|
||
ASSET_POSITIONS_FILE = os.path.join(ROOT_DIR, "asset-positions.json")
|
||
|
||
# 性能保护:默认关闭“每次打开页面随机换背景”,避免首页首屏被磁盘复制拖慢
|
||
AUTO_ROTATE_HOME_ON_PAGE_OPEN = (os.getenv("AUTO_ROTATE_HOME_ON_PAGE_OPEN", "0").strip().lower() in {"1", "true", "yes", "on"})
|
||
AUTO_ROTATE_MIN_INTERVAL_SECONDS = int(os.getenv("AUTO_ROTATE_MIN_INTERVAL_SECONDS", "60"))
|
||
_last_home_rotate_at = 0
|
||
ASSET_DEFAULTS_FILE = os.path.join(ROOT_DIR, "asset-defaults.json")
|
||
RUNTIME_CONFIG_FILE = os.path.join(ROOT_DIR, "runtime-config.json")
|
||
|
||
# Canonical agent states: single source of truth for validation and mapping
|
||
VALID_AGENT_STATES = frozenset({"idle", "writing", "researching", "executing", "syncing", "error"})
|
||
WORKING_STATES = frozenset({"writing", "researching", "executing"}) # subset used for auto-idle TTL
|
||
STATE_TO_AREA_MAP = {
|
||
"idle": "breakroom",
|
||
"writing": "writing",
|
||
"researching": "writing",
|
||
"executing": "writing",
|
||
"syncing": "writing",
|
||
"error": "error",
|
||
}
|
||
|
||
|
||
app = Flask(__name__, static_folder=FRONTEND_DIR, static_url_path="/static")
|
||
app.secret_key = os.getenv("FLASK_SECRET_KEY") or os.getenv("STAR_OFFICE_SECRET") or "star-office-dev-secret-change-me"
|
||
|
||
# Session hardening
|
||
app.config.update(
|
||
SESSION_COOKIE_HTTPONLY=True,
|
||
SESSION_COOKIE_SAMESITE="Lax",
|
||
SESSION_COOKIE_SECURE=is_production_mode(),
|
||
PERMANENT_SESSION_LIFETIME=timedelta(hours=12),
|
||
)
|
||
|
||
# Guard join-agent critical section to enforce per-key concurrency under parallel requests
|
||
join_lock = threading.Lock()
|
||
|
||
# Async background task registry for long-running operations (e.g. image generation)
|
||
# Avoids Cloudflare 524 timeout (100s limit) by letting frontend poll for completion.
|
||
_bg_tasks = {} # task_id -> {"status": "pending"|"done"|"error", "result": ..., "error": ..., "created_at": ...}
|
||
_bg_tasks_lock = threading.Lock()
|
||
|
||
# Generate a version timestamp once at server startup for cache busting
|
||
VERSION_TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
ASSET_DRAWER_PASS_DEFAULT = os.getenv("ASSET_DRAWER_PASS", "1234")
|
||
|
||
if is_production_mode():
|
||
hardening_errors = []
|
||
if not is_strong_secret(str(app.secret_key)):
|
||
hardening_errors.append("FLASK_SECRET_KEY / STAR_OFFICE_SECRET is weak (need >=24 chars, non-default)")
|
||
if not is_strong_drawer_pass(ASSET_DRAWER_PASS_DEFAULT):
|
||
hardening_errors.append("ASSET_DRAWER_PASS is weak (do not use default 1234; recommend >=8 chars)")
|
||
if hardening_errors:
|
||
raise RuntimeError("Security hardening check failed in production mode: " + "; ".join(hardening_errors))
|
||
|
||
|
||
def _is_asset_editor_authed() -> bool:
|
||
return bool(session.get("asset_editor_authed"))
|
||
|
||
|
||
def _require_asset_editor_auth():
|
||
if _is_asset_editor_authed():
|
||
return None
|
||
return jsonify({"ok": False, "code": "UNAUTHORIZED", "msg": "Asset editor auth required"}), 401
|
||
|
||
|
||
@app.after_request
|
||
def add_no_cache_headers(response):
|
||
"""Apply cache policy by path:
|
||
- HTML/API/state: no-cache (always fresh)
|
||
- /static assets (2xx only): long cache (filenames are versioned with ?v=VERSION_TIMESTAMP)
|
||
- /static assets (non-2xx, e.g. 404): no-cache to prevent CDN from caching errors
|
||
"""
|
||
path = (request.path or "")
|
||
if path.startswith('/static/') and 200 <= response.status_code < 300:
|
||
response.headers["Cache-Control"] = "public, max-age=31536000, immutable"
|
||
response.headers.pop("Pragma", None)
|
||
response.headers.pop("Expires", None)
|
||
else:
|
||
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, max-age=0"
|
||
response.headers["Pragma"] = "no-cache"
|
||
response.headers["Expires"] = "0"
|
||
return response
|
||
|
||
# Default state
|
||
DEFAULT_STATE = {
|
||
"state": "idle",
|
||
"detail": "等待任务中...",
|
||
"progress": 0,
|
||
"updated_at": datetime.now().isoformat()
|
||
}
|
||
|
||
|
||
def load_state():
|
||
"""Load state from file.
|
||
|
||
Includes a simple auto-idle mechanism:
|
||
- If the last update is older than ttl_seconds (default 25s)
|
||
and the state is a "working" state, we fall back to idle.
|
||
|
||
This avoids the UI getting stuck at the desk when no new updates arrive.
|
||
"""
|
||
state = None
|
||
if os.path.exists(STATE_FILE):
|
||
try:
|
||
with open(STATE_FILE, "r", encoding="utf-8") as f:
|
||
state = json.load(f)
|
||
except Exception:
|
||
state = None
|
||
|
||
if not isinstance(state, dict):
|
||
state = dict(DEFAULT_STATE)
|
||
|
||
# Auto-idle
|
||
try:
|
||
ttl = int(state.get("ttl_seconds", 300))
|
||
updated_at = state.get("updated_at")
|
||
s = state.get("state", "idle")
|
||
if updated_at and s in WORKING_STATES:
|
||
# tolerate both with/without timezone
|
||
dt = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
|
||
# Use UTC for aware datetimes; local time for naive.
|
||
if dt.tzinfo:
|
||
from datetime import timezone
|
||
age = (datetime.now(timezone.utc) - dt.astimezone(timezone.utc)).total_seconds()
|
||
else:
|
||
age = (datetime.now() - dt).total_seconds()
|
||
if age > ttl:
|
||
state["state"] = "idle"
|
||
state["detail"] = "待命中(自动回到休息区)"
|
||
state["progress"] = 0
|
||
state["updated_at"] = datetime.now().isoformat()
|
||
# persist the auto-idle so every client sees it consistently
|
||
try:
|
||
save_state(state)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
return state
|
||
|
||
|
||
def get_office_name_from_identity():
|
||
"""Read office display name from OpenClaw workspace IDENTITY.md (Name field) -> 'XXX的办公室'."""
|
||
if not os.path.isfile(IDENTITY_FILE):
|
||
return None
|
||
try:
|
||
with open(IDENTITY_FILE, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
m = re.search(r"-\s*\*\*Name:\*\*\s*(.+)", content)
|
||
if m:
|
||
name = m.group(1).strip().replace("\r", "").split("\n")[0].strip()
|
||
return f"{name}的办公室" if name else None
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def save_state(state: dict):
|
||
"""Save state to file"""
|
||
with open(STATE_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(state, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def ensure_electron_standalone_snapshot():
|
||
"""Create Electron standalone frontend snapshot once if missing.
|
||
|
||
The snapshot is intentionally decoupled from the browser page:
|
||
- browser uses frontend/index.html
|
||
- Electron uses frontend/electron-standalone.html
|
||
"""
|
||
if os.path.exists(FRONTEND_ELECTRON_STANDALONE_FILE):
|
||
return
|
||
try:
|
||
shutil.copy2(FRONTEND_INDEX_FILE, FRONTEND_ELECTRON_STANDALONE_FILE)
|
||
print(f"[standalone] created: {FRONTEND_ELECTRON_STANDALONE_FILE}")
|
||
except Exception as e:
|
||
print(f"[standalone] create failed: {e}")
|
||
|
||
|
||
# Initialize state
|
||
if not os.path.exists(STATE_FILE):
|
||
save_state(DEFAULT_STATE)
|
||
ensure_electron_standalone_snapshot()
|
||
|
||
|
||
_INDEX_HTML_CACHE = None
|
||
|
||
|
||
@app.route("/", methods=["GET"])
|
||
def index():
|
||
"""Serve the pixel office UI with built-in version cache busting"""
|
||
# 默认禁用页面打开即换背景,避免首屏慢
|
||
# 如需启用,可配置 AUTO_ROTATE_HOME_ON_PAGE_OPEN=1
|
||
_maybe_apply_random_home_favorite()
|
||
|
||
global _INDEX_HTML_CACHE
|
||
if _INDEX_HTML_CACHE is None:
|
||
with open(FRONTEND_INDEX_FILE, "r", encoding="utf-8") as f:
|
||
raw_html = f.read()
|
||
_INDEX_HTML_CACHE = raw_html.replace("{{VERSION_TIMESTAMP}}", VERSION_TIMESTAMP)
|
||
|
||
resp = make_response(_INDEX_HTML_CACHE)
|
||
resp.headers["Content-Type"] = "text/html; charset=utf-8"
|
||
return resp
|
||
|
||
|
||
@app.route("/electron-standalone", methods=["GET"])
|
||
def electron_standalone_page():
|
||
"""Serve Electron-only standalone frontend page."""
|
||
ensure_electron_standalone_snapshot()
|
||
target = FRONTEND_ELECTRON_STANDALONE_FILE
|
||
if not os.path.exists(target):
|
||
target = FRONTEND_INDEX_FILE
|
||
with open(target, "r", encoding="utf-8") as f:
|
||
html = f.read()
|
||
html = html.replace("{{VERSION_TIMESTAMP}}", VERSION_TIMESTAMP)
|
||
resp = make_response(html)
|
||
resp.headers["Content-Type"] = "text/html; charset=utf-8"
|
||
return resp
|
||
|
||
resp.headers["Content-Type"] = "text/html; charset=utf-8"
|
||
return resp
|
||
|
||
|
||
@app.route("/join", methods=["GET"])
|
||
def join_page():
|
||
"""Serve the agent join page"""
|
||
with open(os.path.join(FRONTEND_DIR, "join.html"), "r", encoding="utf-8") as f:
|
||
html = f.read()
|
||
resp = make_response(html)
|
||
resp.headers["Content-Type"] = "text/html; charset=utf-8"
|
||
return resp
|
||
|
||
|
||
@app.route("/invite", methods=["GET"])
|
||
def invite_page():
|
||
"""Serve human-facing invite instruction page"""
|
||
with open(os.path.join(FRONTEND_DIR, "invite.html"), "r", encoding="utf-8") as f:
|
||
html = f.read()
|
||
resp = make_response(html)
|
||
resp.headers["Content-Type"] = "text/html; charset=utf-8"
|
||
return resp
|
||
|
||
|
||
DEFAULT_AGENTS = [
|
||
{
|
||
"agentId": "star",
|
||
"name": "Star",
|
||
"isMain": True,
|
||
"state": "idle",
|
||
"detail": "待命中,随时准备为你服务",
|
||
"updated_at": datetime.now().isoformat(),
|
||
"area": "breakroom",
|
||
"source": "local",
|
||
"joinKey": None,
|
||
"authStatus": "approved",
|
||
"authExpiresAt": None,
|
||
"lastPushAt": None
|
||
}
|
||
]
|
||
|
||
|
||
def load_agents_state():
|
||
return _store_load_agents_state(AGENTS_STATE_FILE, DEFAULT_AGENTS)
|
||
|
||
|
||
def save_agents_state(agents):
|
||
_store_save_agents_state(AGENTS_STATE_FILE, agents)
|
||
|
||
|
||
def load_asset_positions():
|
||
return _store_load_asset_positions(ASSET_POSITIONS_FILE)
|
||
|
||
|
||
def save_asset_positions(data):
|
||
_store_save_asset_positions(ASSET_POSITIONS_FILE, data)
|
||
|
||
|
||
def load_asset_defaults():
|
||
return _store_load_asset_defaults(ASSET_DEFAULTS_FILE)
|
||
|
||
|
||
def save_asset_defaults(data):
|
||
_store_save_asset_defaults(ASSET_DEFAULTS_FILE, data)
|
||
|
||
|
||
def load_runtime_config():
|
||
return _store_load_runtime_config(RUNTIME_CONFIG_FILE)
|
||
|
||
|
||
def save_runtime_config(data):
|
||
_store_save_runtime_config(RUNTIME_CONFIG_FILE, data)
|
||
|
||
|
||
def _ensure_home_favorites_index():
|
||
os.makedirs(HOME_FAVORITES_DIR, exist_ok=True)
|
||
if not os.path.exists(HOME_FAVORITES_INDEX_FILE):
|
||
with open(HOME_FAVORITES_INDEX_FILE, "w", encoding="utf-8") as f:
|
||
json.dump({"items": []}, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _load_home_favorites_index():
|
||
_ensure_home_favorites_index()
|
||
try:
|
||
with open(HOME_FAVORITES_INDEX_FILE, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict) and isinstance(data.get("items"), list):
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return {"items": []}
|
||
|
||
|
||
def _save_home_favorites_index(data):
|
||
_ensure_home_favorites_index()
|
||
with open(HOME_FAVORITES_INDEX_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _maybe_apply_random_home_favorite():
|
||
"""On page open, randomly apply one saved home favorite if available."""
|
||
global _last_home_rotate_at
|
||
|
||
if not AUTO_ROTATE_HOME_ON_PAGE_OPEN:
|
||
return False, "disabled"
|
||
|
||
try:
|
||
now_ts = datetime.now().timestamp()
|
||
if _last_home_rotate_at and (now_ts - _last_home_rotate_at) < AUTO_ROTATE_MIN_INTERVAL_SECONDS:
|
||
return False, "throttled"
|
||
|
||
idx = _load_home_favorites_index()
|
||
items = idx.get("items") or []
|
||
candidates = []
|
||
for it in items:
|
||
rel = (it.get("path") or "").strip()
|
||
if not rel:
|
||
continue
|
||
abs_path = os.path.join(ROOT_DIR, rel)
|
||
if os.path.exists(abs_path):
|
||
candidates.append((rel, abs_path))
|
||
|
||
if not candidates:
|
||
return False, "no-favorites"
|
||
|
||
rel, src = random.choice(candidates)
|
||
target = FRONTEND_PATH / "office_bg_small.webp"
|
||
if not target.exists():
|
||
return False, "missing-office-bg"
|
||
|
||
shutil.copy2(src, str(target))
|
||
_last_home_rotate_at = now_ts
|
||
return True, rel
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def load_join_keys():
|
||
return _store_load_join_keys(JOIN_KEYS_FILE)
|
||
|
||
|
||
def save_join_keys(data):
|
||
_store_save_join_keys(JOIN_KEYS_FILE, data)
|
||
|
||
|
||
def _ensure_magick_or_ffmpeg_available():
|
||
if shutil.which("magick"):
|
||
return "magick"
|
||
if shutil.which("ffmpeg"):
|
||
return "ffmpeg"
|
||
return None
|
||
|
||
|
||
def _probe_animated_frame_size(upload_path: str):
|
||
"""Return (w,h) from first frame if possible."""
|
||
if Image is not None:
|
||
try:
|
||
with Image.open(upload_path) as im:
|
||
w, h = im.size
|
||
return int(w), int(h)
|
||
except Exception:
|
||
pass
|
||
# ffprobe fallback
|
||
if shutil.which("ffprobe"):
|
||
try:
|
||
cmd = [
|
||
"ffprobe", "-v", "error",
|
||
"-select_streams", "v:0",
|
||
"-show_entries", "stream=width,height",
|
||
"-of", "csv=p=0:s=x",
|
||
upload_path,
|
||
]
|
||
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, timeout=5).decode().strip()
|
||
if "x" in out:
|
||
w, h = out.split("x", 1)
|
||
return int(w), int(h)
|
||
except Exception:
|
||
pass
|
||
return None, None
|
||
|
||
|
||
def _animated_to_spritesheet(
|
||
upload_path: str,
|
||
frame_w: int,
|
||
frame_h: int,
|
||
out_ext: str = ".webp",
|
||
preserve_original: bool = True,
|
||
pixel_art: bool = True,
|
||
cols: int | None = None,
|
||
rows: int | None = None,
|
||
):
|
||
"""Convert animated GIF/WEBP to spritesheet, return (out_path, columns, rows, frames, out_frame_w, out_frame_h)."""
|
||
backend = _ensure_magick_or_ffmpeg_available()
|
||
if not backend:
|
||
raise RuntimeError("未检测到 ImageMagick/ffmpeg,无法自动转换动图")
|
||
|
||
ext = (out_ext or ".webp").lower()
|
||
if ext not in {".webp", ".png"}:
|
||
ext = ".webp"
|
||
|
||
out_fd, out_path = tempfile.mkstemp(suffix=ext)
|
||
os.close(out_fd)
|
||
|
||
with tempfile.TemporaryDirectory() as td:
|
||
frames = 0
|
||
out_fw, out_fh = int(frame_w), int(frame_h)
|
||
if Image is not None:
|
||
try:
|
||
with Image.open(upload_path) as im:
|
||
n = getattr(im, "n_frames", 1)
|
||
# 默认保留用户原始帧尺寸(避免先压缩再放大导致像素糊)
|
||
if preserve_original:
|
||
out_fw, out_fh = im.size
|
||
for i in range(n):
|
||
im.seek(i)
|
||
fr = im.convert("RGBA")
|
||
if not preserve_original and (fr.size != (out_fw, out_fh)):
|
||
resample = Image.Resampling.NEAREST if pixel_art else Image.Resampling.LANCZOS
|
||
fr = fr.resize((out_fw, out_fh), resample)
|
||
fr.save(os.path.join(td, f"f_{i:04d}.png"), "PNG")
|
||
frames = n
|
||
except Exception:
|
||
frames = 0
|
||
|
||
if frames <= 0:
|
||
cmd1 = f"ffmpeg -y -i '{upload_path}' '{td}/f_%04d.png' >/dev/null 2>&1"
|
||
if os.system(cmd1) != 0:
|
||
raise RuntimeError("动图抽帧失败(Pillow/ffmpeg 都失败)")
|
||
files = sorted([x for x in os.listdir(td) if x.startswith("f_") and x.endswith(".png")])
|
||
frames = len(files)
|
||
if frames <= 0:
|
||
raise RuntimeError("动图无有效帧")
|
||
|
||
if backend == "magick":
|
||
# 像素风动图转精灵表默认无损,避免颜色/边缘被压缩糊掉
|
||
quality_flag = "-define webp:lossless=true -define webp:method=6 -quality 100" if ext == ".webp" else ""
|
||
# 允许按 cols/rows 排布;默认单行
|
||
if cols is None or cols <= 0:
|
||
cols_eff = frames
|
||
else:
|
||
cols_eff = max(1, int(cols))
|
||
rows_eff = max(1, int(rows)) if (rows is not None and rows > 0) else max(1, math.ceil(frames / cols_eff))
|
||
|
||
# 先规范单帧尺寸
|
||
prep = ""
|
||
if not preserve_original:
|
||
magick_filter = "-filter point" if pixel_art else ""
|
||
prep = f" {magick_filter} -resize {out_fw}x{out_fh}^ -gravity center -background none -extent {out_fw}x{out_fh}"
|
||
|
||
cmd = (
|
||
f"magick '{td}/f_*.png'{prep} "
|
||
f"-tile {cols_eff}x{rows_eff} -background none -geometry +0+0 {quality_flag} '{out_path}'"
|
||
)
|
||
rc = os.system(cmd)
|
||
if rc != 0:
|
||
raise RuntimeError("ImageMagick 拼图失败")
|
||
return out_path, cols_eff, rows_eff, frames, out_fw, out_fh
|
||
|
||
ffmpeg_quality = "-lossless 1 -compression_level 6 -q:v 100" if ext == ".webp" else ""
|
||
cols_eff = max(1, int(cols)) if (cols is not None and cols > 0) else frames
|
||
rows_eff = max(1, int(rows)) if (rows is not None and rows > 0) else max(1, math.ceil(frames / cols_eff))
|
||
if preserve_original:
|
||
vf = f"tile={cols_eff}x{rows_eff}"
|
||
else:
|
||
scale_algo = "neighbor" if pixel_art else "lanczos"
|
||
vf = (
|
||
f"scale={out_fw}:{out_fh}:force_original_aspect_ratio=decrease:flags={scale_algo},"
|
||
f"pad={out_fw}:{out_fh}:(ow-iw)/2:(oh-ih)/2:color=0x00000000,"
|
||
f"tile={cols_eff}x{rows_eff}"
|
||
)
|
||
cmd2 = (
|
||
f"ffmpeg -y -pattern_type glob -i '{td}/f_*.png' "
|
||
f"-vf '{vf}' "
|
||
f"{ffmpeg_quality} '{out_path}' >/dev/null 2>&1"
|
||
)
|
||
if os.system(cmd2) != 0:
|
||
raise RuntimeError("ffmpeg 拼图失败")
|
||
return out_path, frames, 1, frames, out_fw, out_fh
|
||
|
||
|
||
def normalize_agent_state(s):
|
||
"""Normalize agent state for compatibility.
|
||
Maps synonyms (e.g. working/busy -> writing, run/running -> executing) into VALID_AGENT_STATES.
|
||
Returns 'idle' for unknown values.
|
||
"""
|
||
if not s:
|
||
return 'idle'
|
||
s_lower = s.lower().strip()
|
||
if s_lower in {'working', 'busy', 'write'}:
|
||
return 'writing'
|
||
if s_lower in {'run', 'running', 'execute', 'exec'}:
|
||
return 'executing'
|
||
if s_lower in {'sync'}:
|
||
return 'syncing'
|
||
if s_lower in {'research', 'search'}:
|
||
return 'researching'
|
||
if s_lower in VALID_AGENT_STATES:
|
||
return s_lower
|
||
return 'idle'
|
||
|
||
|
||
# User-facing model aliases -> provider model ids
|
||
USER_MODEL_TO_PROVIDER_MODELS = {
|
||
# 严格按用户要求:仅两种官方模型映射
|
||
"nanobanana-pro": [
|
||
"nano-banana-pro-preview",
|
||
],
|
||
"nanobanana-2": [
|
||
"gemini-2.5-flash-image",
|
||
],
|
||
}
|
||
|
||
PROVIDER_MODEL_TO_USER_MODEL = {
|
||
provider: user
|
||
for user, providers in USER_MODEL_TO_PROVIDER_MODELS.items()
|
||
for provider in providers
|
||
}
|
||
|
||
|
||
def _normalize_user_model(model_name: str) -> str:
|
||
m = (model_name or "").strip()
|
||
if not m:
|
||
return "nanobanana-pro"
|
||
low = m.lower()
|
||
if low in USER_MODEL_TO_PROVIDER_MODELS:
|
||
return low
|
||
if low in PROVIDER_MODEL_TO_USER_MODEL:
|
||
return PROVIDER_MODEL_TO_USER_MODEL[low]
|
||
return "nanobanana-pro"
|
||
|
||
|
||
def _provider_model_candidates(user_model: str):
|
||
normalized = _normalize_user_model(user_model)
|
||
return list(USER_MODEL_TO_PROVIDER_MODELS.get(normalized, USER_MODEL_TO_PROVIDER_MODELS["nanobanana-pro"]))
|
||
|
||
|
||
def _generate_rpg_background_to_webp(out_webp_path: str, width: int = 1280, height: int = 720, custom_prompt: str = "", speed_mode: str = "fast"):
|
||
"""Generate RPG-style room background and save as webp.
|
||
|
||
speed_mode:
|
||
- fast: use nanobanana-2 + 1024x576 intermediate + downscaled reference (faster)
|
||
- quality: use configured model (fallback nanobanana-pro) + full 1280x720 path
|
||
"""
|
||
runtime_cfg = load_runtime_config()
|
||
api_key = (runtime_cfg.get("gemini_api_key") or "").strip()
|
||
if not api_key:
|
||
raise RuntimeError("MISSING_API_KEY")
|
||
themes = [
|
||
"8-bit dungeon guild room",
|
||
"8-bit stardew-valley inspired cozy farm tavern",
|
||
"8-bit nordic fantasy tavern",
|
||
"8-bit magitech workshop",
|
||
"8-bit elven forest inn",
|
||
"8-bit pixel cyber tavern",
|
||
"8-bit desert caravan inn",
|
||
"8-bit snow mountain lodge",
|
||
]
|
||
theme = random.choice(themes)
|
||
|
||
if not (os.path.exists(GEMINI_PYTHON) and os.path.exists(GEMINI_SCRIPT)):
|
||
raise RuntimeError("生图脚本环境缺失:gemini-image-generate 未安装")
|
||
|
||
style_hint = (custom_prompt or "").strip()
|
||
if not style_hint:
|
||
style_hint = theme
|
||
|
||
# 默认使用更稳妥的 quality 档,避免 fast 模型在部分 API 通道不可用
|
||
mode = (speed_mode or "quality").strip().lower()
|
||
if mode not in {"fast", "quality"}:
|
||
mode = "quality"
|
||
|
||
configured_user_model = _normalize_user_model(runtime_cfg.get("gemini_model") or "nanobanana-pro")
|
||
if mode == "fast":
|
||
preferred_user_model = "nanobanana-2"
|
||
# fast 也提高基础清晰度:从 1024x576 提升到 1152x648(牺牲少量速度)
|
||
gen_width, gen_height = 1152, 648
|
||
ref_width, ref_height = 1152, 648
|
||
else:
|
||
preferred_user_model = configured_user_model
|
||
gen_width, gen_height = width, height
|
||
ref_width, ref_height = width, height
|
||
|
||
# 同时规避可能触发 400 的特殊能力参数:
|
||
# 仅 nanobanana-2 走 aspect-ratio,nanobanana-pro 交给模型默认比例(后续再标准化到 1280x720)
|
||
allow_aspect_ratio = (preferred_user_model == "nanobanana-2")
|
||
|
||
prompt = (
|
||
"Use a top-down pixel room composition compatible with an office game scene. "
|
||
"STRICTLY preserve the same room geometry, camera angle, wall/floor boundaries and major object placement as the provided reference image. "
|
||
"Keep region layout stable (left work area, center lounge, right error area). "
|
||
"Only change visual style/theme/material/lighting according to: " + style_hint + ". "
|
||
"Do not add text or watermark. Retro 8-bit RPG style."
|
||
)
|
||
|
||
tmp_dir = tempfile.mkdtemp(prefix="rpg-bg-")
|
||
cmd = [
|
||
GEMINI_PYTHON,
|
||
GEMINI_SCRIPT,
|
||
"--prompt", prompt,
|
||
"--model", configured_user_model,
|
||
"--out-dir", tmp_dir,
|
||
"--cleanup",
|
||
]
|
||
if allow_aspect_ratio:
|
||
cmd.extend(["--aspect-ratio", "16:9"])
|
||
|
||
# 强约束:每次都带固定参考图,保持房间区域布局不漂移
|
||
ref_for_call = None
|
||
if os.path.exists(ROOM_REFERENCE_IMAGE):
|
||
ref_for_call = ROOM_REFERENCE_IMAGE
|
||
if mode == "fast" and Image is not None:
|
||
try:
|
||
ref_fast = os.path.join(tmp_dir, "room-reference-fast.webp")
|
||
with Image.open(ROOM_REFERENCE_IMAGE) as rim:
|
||
rim = rim.convert("RGBA").resize((ref_width, ref_height), Image.Resampling.LANCZOS)
|
||
rim.save(ref_fast, "WEBP", quality=85, method=4)
|
||
ref_for_call = ref_fast
|
||
except Exception:
|
||
ref_for_call = ROOM_REFERENCE_IMAGE
|
||
|
||
if ref_for_call:
|
||
cmd.extend(["--reference-image", ref_for_call])
|
||
|
||
env = os.environ.copy()
|
||
# 运行时配置优先:只保留 GEMINI_API_KEY,避免脚本因双 key 报错
|
||
env.pop("GOOGLE_API_KEY", None)
|
||
env["GEMINI_API_KEY"] = api_key
|
||
|
||
def _run_cmd(cmd_args):
|
||
return subprocess.run(cmd_args, capture_output=True, text=True, env=env, timeout=240)
|
||
|
||
def _is_model_unavailable_error(text: str) -> bool:
|
||
low = (text or "").strip().lower()
|
||
return (
|
||
("not found" in low and "models/" in low)
|
||
or ("model_not_available" in low)
|
||
or ("model is not available" in low)
|
||
or ("configured model is not available" in low)
|
||
or ("this model is not available" in low)
|
||
or ("not supported for generatecontent" in low)
|
||
)
|
||
|
||
def _with_model(cmd_args, model_name: str):
|
||
m = cmd_args[:]
|
||
if "--model" in m:
|
||
idx = m.index("--model")
|
||
if idx + 1 < len(m):
|
||
m[idx + 1] = model_name
|
||
else:
|
||
m.extend(["--model", model_name])
|
||
return m
|
||
|
||
# 模型多级回退(仅允许两类用户模型:nanobanana-pro / nanobanana-2)
|
||
# 每个用户模型映射到若干 provider 真实模型。
|
||
user_model_order = [preferred_user_model, configured_user_model]
|
||
user_model_order = [m for i, m in enumerate(user_model_order) if m and m not in user_model_order[:i]]
|
||
|
||
model_candidates = []
|
||
for um in user_model_order:
|
||
model_candidates.extend(_provider_model_candidates(um))
|
||
# 去重并清理空项
|
||
model_candidates = [m for i, m in enumerate(model_candidates) if m and m not in model_candidates[:i]]
|
||
|
||
proc = None
|
||
last_err_text = ""
|
||
model_unavailable_count = 0
|
||
|
||
for mname in model_candidates:
|
||
env["GEMINI_MODEL"] = mname
|
||
try_cmd = _with_model(cmd, mname)
|
||
proc = _run_cmd(try_cmd)
|
||
if proc.returncode == 0:
|
||
break
|
||
|
||
err_text = (proc.stderr or proc.stdout or "").strip()
|
||
last_err_text = err_text
|
||
|
||
# key 失效/泄漏:立即终止,不继续尝试
|
||
low = err_text.lower()
|
||
if "your api key was reported as leaked" in low or "permission_denied" in low:
|
||
raise RuntimeError("API_KEY_REVOKED_OR_LEAKED")
|
||
|
||
if _is_model_unavailable_error(err_text):
|
||
model_unavailable_count += 1
|
||
continue
|
||
|
||
# 非模型不可用错误,直接返回真实错误
|
||
raise RuntimeError(f"生图失败: {err_text}")
|
||
|
||
if proc is None or proc.returncode != 0:
|
||
err_text = (last_err_text or "").strip()
|
||
if model_unavailable_count >= len(model_candidates) or _is_model_unavailable_error(err_text):
|
||
brief = (err_text or "").replace("\n", " ")[:240]
|
||
raise RuntimeError(f"MODEL_NOT_AVAILABLE::{brief}")
|
||
raise RuntimeError(f"生图失败: {err_text}")
|
||
|
||
try:
|
||
result = json.loads(proc.stdout.strip().splitlines()[-1])
|
||
except Exception:
|
||
raise RuntimeError("生图结果解析失败")
|
||
|
||
files = result.get("files") or []
|
||
if not files:
|
||
raise RuntimeError("生图未返回文件")
|
||
|
||
gen_path = files[0]
|
||
if not os.path.exists(gen_path):
|
||
raise RuntimeError("生图文件不存在")
|
||
|
||
if Image is None:
|
||
raise RuntimeError("Pillow 不可用,无法做尺寸标准化")
|
||
|
||
with Image.open(gen_path) as im:
|
||
im = im.convert("RGBA")
|
||
# 质量模式优先保细节;快速模式优先速度
|
||
if mode == "fast":
|
||
im = im.resize((gen_width, gen_height), Image.Resampling.LANCZOS)
|
||
if (gen_width, gen_height) != (width, height):
|
||
# fast 的放大改为 LANCZOS,牺牲少量速度换更高细节
|
||
im = im.resize((width, height), Image.Resampling.LANCZOS)
|
||
im.save(out_webp_path, "WEBP", quality=96, method=6)
|
||
else:
|
||
# quality:确保输出标准尺寸,同时使用无损 webp,减少压缩损失
|
||
if im.size != (width, height):
|
||
im = im.resize((width, height), Image.Resampling.LANCZOS)
|
||
im.save(out_webp_path, "WEBP", lossless=True, quality=100, method=6)
|
||
|
||
|
||
def state_to_area(state):
|
||
"""Map agent state to office area (breakroom / writing / error)."""
|
||
return STATE_TO_AREA_MAP.get(state, "breakroom")
|
||
|
||
|
||
# Ensure files exist
|
||
if not os.path.exists(AGENTS_STATE_FILE):
|
||
save_agents_state(DEFAULT_AGENTS)
|
||
if not os.path.exists(JOIN_KEYS_FILE):
|
||
if os.path.exists(os.path.join(ROOT_DIR, "join-keys.sample.json")):
|
||
try:
|
||
with open(os.path.join(ROOT_DIR, "join-keys.sample.json"), "r", encoding="utf-8") as sf:
|
||
sample = json.load(sf)
|
||
save_join_keys(sample if isinstance(sample, dict) else {"keys": []})
|
||
except Exception:
|
||
save_join_keys({"keys": []})
|
||
else:
|
||
save_join_keys({"keys": []})
|
||
|
||
# Tighten runtime-config file perms if exists
|
||
if os.path.exists(RUNTIME_CONFIG_FILE):
|
||
try:
|
||
os.chmod(RUNTIME_CONFIG_FILE, 0o600)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
@app.route("/agents", methods=["GET"])
|
||
def get_agents():
|
||
"""Get full agents list (for multi-agent UI), with auto-cleanup on access"""
|
||
agents = load_agents_state()
|
||
now = datetime.now()
|
||
|
||
cleaned_agents = []
|
||
keys_data = load_join_keys()
|
||
|
||
for a in agents:
|
||
if a.get("isMain"):
|
||
cleaned_agents.append(a)
|
||
continue
|
||
|
||
auth_expires_at_str = a.get("authExpiresAt")
|
||
auth_status = a.get("authStatus", "pending")
|
||
|
||
# 1) 超时未批准自动 leave
|
||
if auth_status == "pending" and auth_expires_at_str:
|
||
try:
|
||
auth_expires_at = datetime.fromisoformat(auth_expires_at_str)
|
||
if now > auth_expires_at:
|
||
key = a.get("joinKey")
|
||
if key:
|
||
key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == key), None)
|
||
if key_item:
|
||
key_item["used"] = False
|
||
key_item["usedBy"] = None
|
||
key_item["usedByAgentId"] = None
|
||
key_item["usedAt"] = None
|
||
continue
|
||
except Exception:
|
||
pass
|
||
|
||
# 2) 超时未推送自动离线(超过5分钟)
|
||
last_push_at_str = a.get("lastPushAt")
|
||
if auth_status == "approved" and last_push_at_str:
|
||
try:
|
||
last_push_at = datetime.fromisoformat(last_push_at_str)
|
||
age = (now - last_push_at).total_seconds()
|
||
if age > 300: # 5分钟无推送自动离线
|
||
a["authStatus"] = "offline"
|
||
except Exception:
|
||
pass
|
||
|
||
cleaned_agents.append(a)
|
||
|
||
save_agents_state(cleaned_agents)
|
||
save_join_keys(keys_data)
|
||
|
||
return jsonify(cleaned_agents)
|
||
|
||
|
||
@app.route("/agent-approve", methods=["POST"])
|
||
def agent_approve():
|
||
"""Approve an agent (set authStatus to approved)"""
|
||
try:
|
||
data = request.get_json()
|
||
agent_id = (data.get("agentId") or "").strip()
|
||
if not agent_id:
|
||
return jsonify({"ok": False, "msg": "缺少 agentId"}), 400
|
||
|
||
agents = load_agents_state()
|
||
target = next((a for a in agents if a.get("agentId") == agent_id and not a.get("isMain")), None)
|
||
if not target:
|
||
return jsonify({"ok": False, "msg": "未找到 agent"}), 404
|
||
|
||
target["authStatus"] = "approved"
|
||
target["authApprovedAt"] = datetime.now().isoformat()
|
||
target["authExpiresAt"] = (datetime.now() + timedelta(hours=24)).isoformat() # 默认授权24h
|
||
|
||
save_agents_state(agents)
|
||
return jsonify({"ok": True, "agentId": agent_id, "authStatus": "approved"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/agent-reject", methods=["POST"])
|
||
def agent_reject():
|
||
"""Reject an agent (set authStatus to rejected and optionally revoke key)"""
|
||
try:
|
||
data = request.get_json()
|
||
agent_id = (data.get("agentId") or "").strip()
|
||
if not agent_id:
|
||
return jsonify({"ok": False, "msg": "缺少 agentId"}), 400
|
||
|
||
agents = load_agents_state()
|
||
target = next((a for a in agents if a.get("agentId") == agent_id and not a.get("isMain")), None)
|
||
if not target:
|
||
return jsonify({"ok": False, "msg": "未找到 agent"}), 404
|
||
|
||
target["authStatus"] = "rejected"
|
||
target["authRejectedAt"] = datetime.now().isoformat()
|
||
|
||
# Optionally free join key back to unused
|
||
join_key = target.get("joinKey")
|
||
keys_data = load_join_keys()
|
||
if join_key:
|
||
key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == join_key), None)
|
||
if key_item:
|
||
key_item["used"] = False
|
||
key_item["usedBy"] = None
|
||
key_item["usedByAgentId"] = None
|
||
key_item["usedAt"] = None
|
||
|
||
# Remove from agents list
|
||
agents = [a for a in agents if a.get("agentId") != agent_id or a.get("isMain")]
|
||
|
||
save_agents_state(agents)
|
||
save_join_keys(keys_data)
|
||
return jsonify({"ok": True, "agentId": agent_id, "authStatus": "rejected"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/join-agent", methods=["POST"])
|
||
def join_agent():
|
||
"""Add a new agent with one-time join key validation and pending auth"""
|
||
try:
|
||
data = request.get_json()
|
||
if not isinstance(data, dict) or not data.get("name"):
|
||
return jsonify({"ok": False, "msg": "请提供名字"}), 400
|
||
|
||
name = data["name"].strip()
|
||
state = data.get("state", "idle")
|
||
detail = data.get("detail", "")
|
||
join_key = data.get("joinKey", "").strip()
|
||
|
||
# Normalize state early for compatibility
|
||
state = normalize_agent_state(state)
|
||
|
||
if not join_key:
|
||
return jsonify({"ok": False, "msg": "请提供接入密钥"}), 400
|
||
|
||
keys_data = load_join_keys()
|
||
key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == join_key), None)
|
||
if not key_item:
|
||
return jsonify({"ok": False, "msg": "接入密钥无效"}), 403
|
||
# key 可复用:不再因为 used=true 拒绝
|
||
|
||
with join_lock:
|
||
# 在锁内重新读取,避免并发请求都基于同一旧快照通过校验
|
||
keys_data = load_join_keys()
|
||
key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == join_key), None)
|
||
if not key_item:
|
||
return jsonify({"ok": False, "msg": "接入密钥无效"}), 403
|
||
|
||
# Key-level expiration check
|
||
key_expires_at_str = key_item.get("expiresAt")
|
||
if key_expires_at_str:
|
||
try:
|
||
key_expires_at = datetime.fromisoformat(key_expires_at_str)
|
||
if datetime.now() > key_expires_at:
|
||
return jsonify({"ok": False, "msg": "该接入密钥已过期,活动已结束 🎉"}), 403
|
||
except Exception:
|
||
pass
|
||
|
||
agents = load_agents_state()
|
||
|
||
# 并发上限:同一个 key “同时在线”最多 3 个。
|
||
# 在线判定:lastPushAt/updated_at 在 5 分钟内;否则视为 offline,不计入并发。
|
||
now = datetime.now()
|
||
existing = next((a for a in agents if a.get("name") == name and not a.get("isMain")), None)
|
||
existing_id = existing.get("agentId") if existing else None
|
||
|
||
def _age_seconds(dt_str):
|
||
if not dt_str:
|
||
return None
|
||
try:
|
||
dt = datetime.fromisoformat(dt_str)
|
||
return (now - dt).total_seconds()
|
||
except Exception:
|
||
return None
|
||
|
||
# opportunistic offline marking
|
||
for a in agents:
|
||
if a.get("isMain"):
|
||
continue
|
||
if a.get("authStatus") != "approved":
|
||
continue
|
||
age = _age_seconds(a.get("lastPushAt"))
|
||
if age is None:
|
||
age = _age_seconds(a.get("updated_at"))
|
||
if age is not None and age > 300:
|
||
a["authStatus"] = "offline"
|
||
|
||
max_concurrent = int(key_item.get("maxConcurrent", 3))
|
||
active_count = 0
|
||
for a in agents:
|
||
if a.get("isMain"):
|
||
continue
|
||
if a.get("agentId") == existing_id:
|
||
continue
|
||
if a.get("joinKey") != join_key:
|
||
continue
|
||
if a.get("authStatus") != "approved":
|
||
continue
|
||
age = _age_seconds(a.get("lastPushAt"))
|
||
if age is None:
|
||
age = _age_seconds(a.get("updated_at"))
|
||
if age is None or age <= 300:
|
||
active_count += 1
|
||
|
||
if active_count >= max_concurrent:
|
||
save_agents_state(agents)
|
||
return jsonify({"ok": False, "msg": f"该接入密钥当前并发已达上限({max_concurrent}),请稍后或换另一个 key"}), 429
|
||
|
||
if existing:
|
||
existing["state"] = state
|
||
existing["detail"] = detail
|
||
existing["updated_at"] = datetime.now().isoformat()
|
||
existing["area"] = state_to_area(state)
|
||
existing["source"] = "remote-openclaw"
|
||
existing["joinKey"] = join_key
|
||
existing["authStatus"] = "approved"
|
||
existing["authApprovedAt"] = datetime.now().isoformat()
|
||
existing["authExpiresAt"] = (datetime.now() + timedelta(hours=24)).isoformat()
|
||
existing["lastPushAt"] = datetime.now().isoformat() # join 视为上线,纳入并发/离线判定
|
||
if not existing.get("avatar"):
|
||
import random
|
||
existing["avatar"] = random.choice(["guest_role_1", "guest_role_2", "guest_role_3", "guest_role_4", "guest_role_5", "guest_role_6"])
|
||
agent_id = existing.get("agentId")
|
||
else:
|
||
# Use ms + random suffix to avoid collisions under concurrent joins
|
||
import random
|
||
import string
|
||
agent_id = "agent_" + str(int(datetime.now().timestamp() * 1000)) + "_" + "".join(random.choices(string.ascii_lowercase + string.digits, k=4))
|
||
agents.append({
|
||
"agentId": agent_id,
|
||
"name": name,
|
||
"isMain": False,
|
||
"state": state,
|
||
"detail": detail,
|
||
"updated_at": datetime.now().isoformat(),
|
||
"area": state_to_area(state),
|
||
"source": "remote-openclaw",
|
||
"joinKey": join_key,
|
||
"authStatus": "approved",
|
||
"authApprovedAt": datetime.now().isoformat(),
|
||
"authExpiresAt": (datetime.now() + timedelta(hours=24)).isoformat(),
|
||
"lastPushAt": datetime.now().isoformat(),
|
||
"avatar": random.choice(["guest_role_1", "guest_role_2", "guest_role_3", "guest_role_4", "guest_role_5", "guest_role_6"])
|
||
})
|
||
|
||
key_item["used"] = True
|
||
key_item["usedBy"] = name
|
||
key_item["usedByAgentId"] = agent_id
|
||
key_item["usedAt"] = datetime.now().isoformat()
|
||
key_item["reusable"] = True
|
||
|
||
# 拿到有效 key 直接批准,不再等待主人手动点击
|
||
# (状态已在上面 existing/new 分支写入)
|
||
save_agents_state(agents)
|
||
save_join_keys(keys_data)
|
||
|
||
return jsonify({"ok": True, "agentId": agent_id, "authStatus": "approved", "nextStep": "已自动批准,立即开始推送状态"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/leave-agent", methods=["POST"])
|
||
def leave_agent():
|
||
"""Remove an agent and free its one-time join key for reuse (optional)
|
||
|
||
Prefer agentId (stable). Name is accepted for backward compatibility.
|
||
"""
|
||
try:
|
||
data = request.get_json()
|
||
if not isinstance(data, dict):
|
||
return jsonify({"ok": False, "msg": "invalid json"}), 400
|
||
|
||
agent_id = (data.get("agentId") or "").strip()
|
||
name = (data.get("name") or "").strip()
|
||
if not agent_id and not name:
|
||
return jsonify({"ok": False, "msg": "请提供 agentId 或名字"}), 400
|
||
|
||
agents = load_agents_state()
|
||
|
||
target = None
|
||
if agent_id:
|
||
target = next((a for a in agents if a.get("agentId") == agent_id and not a.get("isMain")), None)
|
||
if (not target) and name:
|
||
# fallback: remove by name only if agentId not provided
|
||
target = next((a for a in agents if a.get("name") == name and not a.get("isMain")), None)
|
||
|
||
if not target:
|
||
return jsonify({"ok": False, "msg": "没有找到要离开的 agent"}), 404
|
||
|
||
join_key = target.get("joinKey")
|
||
new_agents = [a for a in agents if a.get("isMain") or a.get("agentId") != target.get("agentId")]
|
||
|
||
# Optional: free key back to unused after leave
|
||
keys_data = load_join_keys()
|
||
if join_key:
|
||
key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == join_key), None)
|
||
if key_item:
|
||
key_item["used"] = False
|
||
key_item["usedBy"] = None
|
||
key_item["usedByAgentId"] = None
|
||
key_item["usedAt"] = None
|
||
|
||
save_agents_state(new_agents)
|
||
save_join_keys(keys_data)
|
||
return jsonify({"ok": True})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/status", methods=["GET"])
|
||
def get_status():
|
||
"""Get current main state (backward compatibility). Optionally include officeName from IDENTITY.md."""
|
||
state = load_state()
|
||
office_name = get_office_name_from_identity()
|
||
if office_name:
|
||
state["officeName"] = office_name
|
||
return jsonify(state)
|
||
|
||
|
||
@app.route("/agent-push", methods=["POST"])
|
||
def agent_push():
|
||
"""Remote openclaw actively pushes status to office.
|
||
|
||
Required fields:
|
||
- agentId
|
||
- joinKey
|
||
- state
|
||
Optional:
|
||
- detail
|
||
- name
|
||
"""
|
||
try:
|
||
data = request.get_json()
|
||
if not isinstance(data, dict):
|
||
return jsonify({"ok": False, "msg": "invalid json"}), 400
|
||
|
||
agent_id = (data.get("agentId") or "").strip()
|
||
join_key = (data.get("joinKey") or "").strip()
|
||
state = (data.get("state") or "").strip()
|
||
detail = (data.get("detail") or "").strip()
|
||
name = (data.get("name") or "").strip()
|
||
|
||
if not agent_id or not join_key or not state:
|
||
return jsonify({"ok": False, "msg": "缺少 agentId/joinKey/state"}), 400
|
||
|
||
state = normalize_agent_state(state)
|
||
|
||
keys_data = load_join_keys()
|
||
key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == join_key), None)
|
||
if not key_item:
|
||
return jsonify({"ok": False, "msg": "joinKey 无效"}), 403
|
||
|
||
# Key-level expiration check
|
||
key_expires_at_str = key_item.get("expiresAt")
|
||
if key_expires_at_str:
|
||
try:
|
||
key_expires_at = datetime.fromisoformat(key_expires_at_str)
|
||
if datetime.now() > key_expires_at:
|
||
return jsonify({"ok": False, "msg": "该接入密钥已过期,活动已结束 🎉"}), 403
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
agents = load_agents_state()
|
||
target = next((a for a in agents if a.get("agentId") == agent_id and not a.get("isMain")), None)
|
||
if not target:
|
||
return jsonify({"ok": False, "msg": "agent 未注册,请先 join"}), 404
|
||
|
||
# Auth check: only approved agents can push.
|
||
# Note: "offline" is a presence state (stale), not a revoked authorization.
|
||
# Allow offline agents to resume pushing and auto-promote them back to approved.
|
||
auth_status = target.get("authStatus", "pending")
|
||
if auth_status not in {"approved", "offline"}:
|
||
return jsonify({"ok": False, "msg": "agent 未获授权,请等待主人批准"}), 403
|
||
if auth_status == "offline":
|
||
target["authStatus"] = "approved"
|
||
target["authApprovedAt"] = datetime.now().isoformat()
|
||
target["authExpiresAt"] = (datetime.now() + timedelta(hours=24)).isoformat()
|
||
|
||
if target.get("joinKey") != join_key:
|
||
return jsonify({"ok": False, "msg": "joinKey 不匹配"}), 403
|
||
|
||
target["state"] = state
|
||
target["detail"] = detail
|
||
if name:
|
||
target["name"] = name
|
||
target["updated_at"] = datetime.now().isoformat()
|
||
target["area"] = state_to_area(state)
|
||
target["source"] = "remote-openclaw"
|
||
target["lastPushAt"] = datetime.now().isoformat()
|
||
|
||
save_agents_state(agents)
|
||
return jsonify({"ok": True, "agentId": agent_id, "area": target.get("area")})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/health", methods=["GET"])
|
||
def health():
|
||
"""Health check"""
|
||
return jsonify({
|
||
"status": "ok",
|
||
"service": "star-office-ui",
|
||
"timestamp": datetime.now().isoformat(),
|
||
})
|
||
|
||
|
||
@app.route("/yesterday-memo", methods=["GET"])
|
||
def get_yesterday_memo():
|
||
"""获取昨日小日记"""
|
||
try:
|
||
# 先尝试找昨天的文件
|
||
yesterday_str = get_yesterday_date_str()
|
||
yesterday_file = os.path.join(MEMORY_DIR, f"{yesterday_str}.md")
|
||
|
||
target_file = None
|
||
target_date = yesterday_str
|
||
|
||
if os.path.exists(yesterday_file):
|
||
target_file = yesterday_file
|
||
else:
|
||
# 如果昨天没有,找最近的一天
|
||
if os.path.exists(MEMORY_DIR):
|
||
files = [f for f in os.listdir(MEMORY_DIR) if f.endswith(".md") and re.match(r"\d{4}-\d{2}-\d{2}\.md", f)]
|
||
if files:
|
||
files.sort(reverse=True)
|
||
# 跳过今天的(如果存在)
|
||
today_str = datetime.now().strftime("%Y-%m-%d")
|
||
for f in files:
|
||
if f != f"{today_str}.md":
|
||
target_file = os.path.join(MEMORY_DIR, f)
|
||
target_date = f.replace(".md", "")
|
||
break
|
||
|
||
if target_file and os.path.exists(target_file):
|
||
memo_content = extract_memo_from_file(target_file)
|
||
return jsonify({
|
||
"success": True,
|
||
"date": target_date,
|
||
"memo": memo_content
|
||
})
|
||
else:
|
||
return jsonify({
|
||
"success": False,
|
||
"msg": "没有找到昨日日记"
|
||
})
|
||
except Exception as e:
|
||
return jsonify({
|
||
"success": False,
|
||
"msg": str(e)
|
||
}), 500
|
||
|
||
|
||
@app.route("/set_state", methods=["POST"])
|
||
def set_state_endpoint():
|
||
"""Set state via POST (for UI control panel)"""
|
||
try:
|
||
data = request.get_json()
|
||
if not isinstance(data, dict):
|
||
return jsonify({"status": "error", "msg": "invalid json"}), 400
|
||
state = load_state()
|
||
if "state" in data:
|
||
s = data["state"]
|
||
if s in VALID_AGENT_STATES:
|
||
state["state"] = s
|
||
if "detail" in data:
|
||
state["detail"] = data["detail"]
|
||
state["updated_at"] = datetime.now().isoformat()
|
||
save_state(state)
|
||
return jsonify({"status": "ok"})
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/template.zip", methods=["GET"])
|
||
def assets_template_download():
|
||
if not os.path.exists(ASSET_TEMPLATE_ZIP):
|
||
return jsonify({"ok": False, "msg": "模板包不存在,请先生成"}), 404
|
||
return send_from_directory(ROOT_DIR, "assets-replace-template.zip", as_attachment=True)
|
||
|
||
|
||
@app.route("/assets/list", methods=["GET"])
|
||
def assets_list():
|
||
items = []
|
||
for p in FRONTEND_PATH.rglob("*"):
|
||
if not p.is_file():
|
||
continue
|
||
rel = p.relative_to(FRONTEND_PATH).as_posix()
|
||
if rel.startswith("fonts/"):
|
||
continue
|
||
if p.suffix.lower() not in ASSET_ALLOWED_EXTS:
|
||
continue
|
||
st = p.stat()
|
||
width = None
|
||
height = None
|
||
if Image is not None:
|
||
try:
|
||
with Image.open(p) as im:
|
||
width, height = im.size
|
||
except Exception:
|
||
pass
|
||
items.append({
|
||
"path": rel,
|
||
"size": st.st_size,
|
||
"ext": p.suffix.lower(),
|
||
"width": width,
|
||
"height": height,
|
||
"mtime": datetime.fromtimestamp(st.st_mtime).isoformat(),
|
||
})
|
||
items.sort(key=lambda x: x["path"])
|
||
return jsonify({"ok": True, "count": len(items), "items": items})
|
||
|
||
|
||
def _bg_generate_worker(task_id: str, custom_prompt: str, speed_mode: str):
|
||
"""Background worker for RPG background generation."""
|
||
try:
|
||
target = FRONTEND_PATH / "office_bg_small.webp"
|
||
|
||
# 覆盖前保留最近一次备份
|
||
bak = target.with_suffix(target.suffix + ".bak")
|
||
shutil.copy2(target, bak)
|
||
|
||
_generate_rpg_background_to_webp(
|
||
str(target),
|
||
width=1280,
|
||
height=720,
|
||
custom_prompt=custom_prompt,
|
||
speed_mode=speed_mode,
|
||
)
|
||
|
||
# 每次生成都归档一份历史底图(可回溯风格演化)
|
||
os.makedirs(BG_HISTORY_DIR, exist_ok=True)
|
||
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||
hist_file = os.path.join(BG_HISTORY_DIR, f"office_bg_small-{ts}.webp")
|
||
shutil.copy2(target, hist_file)
|
||
|
||
st = target.stat()
|
||
with _bg_tasks_lock:
|
||
_bg_tasks[task_id] = {
|
||
"status": "done",
|
||
"result": {
|
||
"ok": True,
|
||
"path": "office_bg_small.webp",
|
||
"size": st.st_size,
|
||
"history": os.path.relpath(hist_file, ROOT_DIR),
|
||
"speed_mode": speed_mode,
|
||
"msg": "已生成并替换 RPG 房间底图(已自动归档)",
|
||
},
|
||
}
|
||
except Exception as e:
|
||
msg = str(e)
|
||
error_result = {"ok": False, "msg": msg}
|
||
if msg == "MISSING_API_KEY":
|
||
error_result["code"] = "MISSING_API_KEY"
|
||
error_result["msg"] = "Missing GEMINI_API_KEY or GOOGLE_API_KEY"
|
||
elif msg == "API_KEY_REVOKED_OR_LEAKED":
|
||
error_result["code"] = "API_KEY_REVOKED_OR_LEAKED"
|
||
error_result["msg"] = "API key is revoked or flagged as leaked. Please rotate to a new key."
|
||
elif msg.startswith("MODEL_NOT_AVAILABLE"):
|
||
error_result["code"] = "MODEL_NOT_AVAILABLE"
|
||
error_result["msg"] = "Configured model is not available for this API key/channel."
|
||
if "::" in msg:
|
||
error_result["detail"] = msg.split("::", 1)[1]
|
||
with _bg_tasks_lock:
|
||
_bg_tasks[task_id] = {"status": "error", "result": error_result}
|
||
|
||
|
||
@app.route("/assets/generate-rpg-background", methods=["POST"])
|
||
def assets_generate_rpg_background():
|
||
"""Start async RPG background generation. Returns a task_id for polling."""
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
req = request.get_json(silent=True) or {}
|
||
custom_prompt = (req.get("prompt") or "").strip() if isinstance(req, dict) else ""
|
||
speed_mode = (req.get("speed_mode") or "quality").strip().lower() if isinstance(req, dict) else "quality"
|
||
if speed_mode not in {"fast", "quality"}:
|
||
speed_mode = "fast"
|
||
|
||
target = FRONTEND_PATH / "office_bg_small.webp"
|
||
if not target.exists():
|
||
return jsonify({"ok": False, "msg": "office_bg_small.webp 不存在"}), 404
|
||
|
||
# Pre-flight checks that can fail fast (before spawning thread)
|
||
runtime_cfg = load_runtime_config()
|
||
api_key = (runtime_cfg.get("gemini_api_key") or "").strip()
|
||
if not api_key:
|
||
return jsonify({"ok": False, "code": "MISSING_API_KEY", "msg": "Missing GEMINI_API_KEY or GOOGLE_API_KEY"}), 400
|
||
if not (os.path.exists(GEMINI_PYTHON) and os.path.exists(GEMINI_SCRIPT)):
|
||
return jsonify({"ok": False, "msg": "生图脚本环境缺失:gemini-image-generate 未安装"}), 500
|
||
|
||
# Check if another generation is already running
|
||
with _bg_tasks_lock:
|
||
for tid, task in _bg_tasks.items():
|
||
if task.get("status") == "pending":
|
||
return jsonify({"ok": True, "async": True, "task_id": tid, "msg": "已有生图任务进行中,请等待完成"}), 200
|
||
|
||
# Create async task
|
||
import string as _string
|
||
task_id = "gen_" + str(int(datetime.now().timestamp() * 1000)) + "_" + "".join(random.choices(_string.ascii_lowercase + _string.digits, k=4))
|
||
with _bg_tasks_lock:
|
||
_bg_tasks[task_id] = {"status": "pending", "created_at": datetime.now().isoformat()}
|
||
|
||
t = threading.Thread(target=_bg_generate_worker, args=(task_id, custom_prompt, speed_mode), daemon=True)
|
||
t.start()
|
||
|
||
return jsonify({"ok": True, "async": True, "task_id": task_id, "msg": "生图任务已启动,请通过 task_id 轮询结果"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/generate-rpg-background/poll", methods=["GET"])
|
||
def assets_generate_rpg_background_poll():
|
||
"""Poll async generation task status."""
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
task_id = (request.args.get("task_id") or "").strip()
|
||
if not task_id:
|
||
return jsonify({"ok": False, "msg": "缺少 task_id"}), 400
|
||
with _bg_tasks_lock:
|
||
task = _bg_tasks.get(task_id)
|
||
if not task:
|
||
return jsonify({"ok": False, "msg": "任务不存在"}), 404
|
||
status = task.get("status", "pending")
|
||
if status == "pending":
|
||
return jsonify({"ok": True, "status": "pending", "msg": "生图进行中..."})
|
||
elif status == "done":
|
||
# Clean up task after delivering result
|
||
with _bg_tasks_lock:
|
||
_bg_tasks.pop(task_id, None)
|
||
return jsonify({"ok": True, "status": "done", **task.get("result", {})})
|
||
else:
|
||
with _bg_tasks_lock:
|
||
_bg_tasks.pop(task_id, None)
|
||
result = task.get("result", {})
|
||
code = 400 if result.get("code") else 500
|
||
return jsonify({"ok": False, "status": "error", **result}), code
|
||
|
||
|
||
@app.route("/assets/restore-reference-background", methods=["POST"])
|
||
def assets_restore_reference_background():
|
||
"""Restore office_bg_small.webp from fixed reference image."""
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
target = FRONTEND_PATH / "office_bg_small.webp"
|
||
if not target.exists():
|
||
return jsonify({"ok": False, "msg": "office_bg_small.webp 不存在"}), 404
|
||
if not os.path.exists(ROOM_REFERENCE_IMAGE):
|
||
return jsonify({"ok": False, "msg": "参考图不存在"}), 404
|
||
|
||
# 备份当前底图
|
||
bak = target.with_suffix(target.suffix + ".bak")
|
||
shutil.copy2(target, bak)
|
||
|
||
# 快速路径:若参考图已是 1280x720 的 webp,直接拷贝(秒级)
|
||
ref_ext = os.path.splitext(ROOM_REFERENCE_IMAGE)[1].lower()
|
||
fast_copied = False
|
||
if ref_ext == '.webp':
|
||
try:
|
||
with Image.open(ROOM_REFERENCE_IMAGE) as rim:
|
||
if rim.size == (1280, 720):
|
||
shutil.copy2(ROOM_REFERENCE_IMAGE, target)
|
||
fast_copied = True
|
||
except Exception:
|
||
fast_copied = False
|
||
|
||
# 慢路径:仅在必要时重编码
|
||
if not fast_copied:
|
||
if Image is None:
|
||
return jsonify({"ok": False, "msg": "Pillow 不可用"}), 500
|
||
with Image.open(ROOM_REFERENCE_IMAGE) as im:
|
||
im = im.convert("RGBA").resize((1280, 720), Image.Resampling.LANCZOS)
|
||
im.save(target, "WEBP", quality=92, method=6)
|
||
|
||
st = target.stat()
|
||
return jsonify({
|
||
"ok": True,
|
||
"path": "office_bg_small.webp",
|
||
"size": st.st_size,
|
||
"msg": "已恢复初始底图",
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/restore-last-generated-background", methods=["POST"])
|
||
def assets_restore_last_generated_background():
|
||
"""Restore office_bg_small.webp from latest bg-history snapshot."""
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
target = FRONTEND_PATH / "office_bg_small.webp"
|
||
if not target.exists():
|
||
return jsonify({"ok": False, "msg": "office_bg_small.webp 不存在"}), 404
|
||
|
||
if not os.path.isdir(BG_HISTORY_DIR):
|
||
return jsonify({"ok": False, "msg": "暂无历史底图"}), 404
|
||
|
||
files = [
|
||
os.path.join(BG_HISTORY_DIR, x)
|
||
for x in os.listdir(BG_HISTORY_DIR)
|
||
if x.startswith("office_bg_small-") and x.endswith(".webp")
|
||
]
|
||
if not files:
|
||
return jsonify({"ok": False, "msg": "暂无历史底图"}), 404
|
||
|
||
latest = max(files, key=lambda p: os.path.getmtime(p))
|
||
|
||
bak = target.with_suffix(target.suffix + ".bak")
|
||
shutil.copy2(target, bak)
|
||
shutil.copy2(latest, target)
|
||
|
||
st = target.stat()
|
||
return jsonify({
|
||
"ok": True,
|
||
"path": "office_bg_small.webp",
|
||
"size": st.st_size,
|
||
"from": os.path.relpath(latest, ROOT_DIR),
|
||
"msg": "已回退到最近一次生成底图",
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/home-favorites/list", methods=["GET"])
|
||
def assets_home_favorites_list():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = _load_home_favorites_index()
|
||
items = data.get("items") or []
|
||
out = []
|
||
for it in items:
|
||
rel = (it.get("path") or "").strip()
|
||
if not rel:
|
||
continue
|
||
abs_path = os.path.join(ROOT_DIR, rel)
|
||
if not os.path.exists(abs_path):
|
||
continue
|
||
fn = os.path.basename(rel)
|
||
out.append({
|
||
"id": it.get("id"),
|
||
"path": rel,
|
||
"url": f"/assets/home-favorites/file/{fn}",
|
||
"thumb_url": f"/assets/home-favorites/file/{fn}",
|
||
"created_at": it.get("created_at") or "",
|
||
})
|
||
out.sort(key=lambda x: x.get("created_at") or "", reverse=True)
|
||
return jsonify({"ok": True, "items": out})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/home-favorites/file/<path:filename>", methods=["GET"])
|
||
def assets_home_favorites_file(filename):
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
return send_from_directory(HOME_FAVORITES_DIR, filename)
|
||
|
||
|
||
@app.route("/assets/home-favorites/save-current", methods=["POST"])
|
||
def assets_home_favorites_save_current():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
src = FRONTEND_PATH / "office_bg_small.webp"
|
||
if not src.exists():
|
||
return jsonify({"ok": False, "msg": "office_bg_small.webp 不存在"}), 404
|
||
|
||
_ensure_home_favorites_index()
|
||
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||
item_id = f"home-{ts}"
|
||
fn = f"{item_id}.webp"
|
||
dst = os.path.join(HOME_FAVORITES_DIR, fn)
|
||
shutil.copy2(str(src), dst)
|
||
|
||
idx = _load_home_favorites_index()
|
||
items = idx.get("items") or []
|
||
items.insert(0, {
|
||
"id": item_id,
|
||
"path": os.path.relpath(dst, ROOT_DIR),
|
||
"created_at": datetime.now().isoformat(timespec="seconds"),
|
||
})
|
||
|
||
# 控制收藏数量上限,清理最旧项
|
||
if len(items) > HOME_FAVORITES_MAX:
|
||
extra = items[HOME_FAVORITES_MAX:]
|
||
items = items[:HOME_FAVORITES_MAX]
|
||
for it in extra:
|
||
try:
|
||
p = os.path.join(ROOT_DIR, it.get("path") or "")
|
||
if os.path.exists(p):
|
||
os.remove(p)
|
||
except Exception:
|
||
pass
|
||
|
||
idx["items"] = items
|
||
_save_home_favorites_index(idx)
|
||
return jsonify({"ok": True, "id": item_id, "path": os.path.relpath(dst, ROOT_DIR), "msg": "已收藏当前地图"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/home-favorites/delete", methods=["POST"])
|
||
def assets_home_favorites_delete():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
item_id = (data.get("id") or "").strip()
|
||
if not item_id:
|
||
return jsonify({"ok": False, "msg": "缺少 id"}), 400
|
||
|
||
idx = _load_home_favorites_index()
|
||
items = idx.get("items") or []
|
||
hit = next((x for x in items if (x.get("id") or "") == item_id), None)
|
||
if not hit:
|
||
return jsonify({"ok": False, "msg": "收藏项不存在"}), 404
|
||
|
||
rel = hit.get("path") or ""
|
||
abs_path = os.path.join(ROOT_DIR, rel)
|
||
if os.path.exists(abs_path):
|
||
try:
|
||
os.remove(abs_path)
|
||
except Exception:
|
||
pass
|
||
|
||
idx["items"] = [x for x in items if (x.get("id") or "") != item_id]
|
||
_save_home_favorites_index(idx)
|
||
return jsonify({"ok": True, "id": item_id, "msg": "已删除收藏"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/home-favorites/apply", methods=["POST"])
|
||
def assets_home_favorites_apply():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
item_id = (data.get("id") or "").strip()
|
||
if not item_id:
|
||
return jsonify({"ok": False, "msg": "缺少 id"}), 400
|
||
|
||
idx = _load_home_favorites_index()
|
||
items = idx.get("items") or []
|
||
hit = next((x for x in items if (x.get("id") or "") == item_id), None)
|
||
if not hit:
|
||
return jsonify({"ok": False, "msg": "收藏项不存在"}), 404
|
||
|
||
src = os.path.join(ROOT_DIR, hit.get("path") or "")
|
||
if not os.path.exists(src):
|
||
return jsonify({"ok": False, "msg": "收藏文件不存在"}), 404
|
||
|
||
target = FRONTEND_PATH / "office_bg_small.webp"
|
||
if not target.exists():
|
||
return jsonify({"ok": False, "msg": "office_bg_small.webp 不存在"}), 404
|
||
|
||
bak = target.with_suffix(target.suffix + ".bak")
|
||
shutil.copy2(str(target), str(bak))
|
||
shutil.copy2(src, str(target))
|
||
|
||
st = target.stat()
|
||
return jsonify({"ok": True, "path": "office_bg_small.webp", "size": st.st_size, "from": hit.get("path"), "msg": "已应用收藏地图"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/auth", methods=["POST"])
|
||
def assets_auth():
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
pwd = (data.get("password") or "").strip()
|
||
if pwd and pwd == ASSET_DRAWER_PASS_DEFAULT:
|
||
session["asset_editor_authed"] = True
|
||
return jsonify({"ok": True, "msg": "认证成功"})
|
||
return jsonify({"ok": False, "msg": "验证码错误"}), 401
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/auth/status", methods=["GET"])
|
||
def assets_auth_status():
|
||
return jsonify({
|
||
"ok": True,
|
||
"authed": _is_asset_editor_authed(),
|
||
"drawer_default_pass": ASSET_DRAWER_PASS_DEFAULT == "1234",
|
||
})
|
||
|
||
|
||
@app.route("/assets/positions", methods=["GET"])
|
||
def assets_positions_get():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
return jsonify({"ok": True, "items": load_asset_positions()})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/positions", methods=["POST"])
|
||
def assets_positions_set():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
key = (data.get("key") or "").strip()
|
||
x = data.get("x")
|
||
y = data.get("y")
|
||
scale = data.get("scale")
|
||
if not key:
|
||
return jsonify({"ok": False, "msg": "缺少 key"}), 400
|
||
if x is None or y is None:
|
||
return jsonify({"ok": False, "msg": "缺少 x/y"}), 400
|
||
x = float(x)
|
||
y = float(y)
|
||
if scale is None:
|
||
scale = 1.0
|
||
scale = float(scale)
|
||
|
||
all_pos = load_asset_positions()
|
||
all_pos[key] = {"x": x, "y": y, "scale": scale, "updated_at": datetime.now().isoformat()}
|
||
save_asset_positions(all_pos)
|
||
return jsonify({"ok": True, "key": key, "x": x, "y": y, "scale": scale})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/defaults", methods=["GET"])
|
||
def assets_defaults_get():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
return jsonify({"ok": True, "items": load_asset_defaults()})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/defaults", methods=["POST"])
|
||
def assets_defaults_set():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
key = (data.get("key") or "").strip()
|
||
x = data.get("x")
|
||
y = data.get("y")
|
||
scale = data.get("scale")
|
||
if not key:
|
||
return jsonify({"ok": False, "msg": "缺少 key"}), 400
|
||
if x is None or y is None:
|
||
return jsonify({"ok": False, "msg": "缺少 x/y"}), 400
|
||
x = float(x)
|
||
y = float(y)
|
||
if scale is None:
|
||
scale = 1.0
|
||
scale = float(scale)
|
||
|
||
all_defaults = load_asset_defaults()
|
||
all_defaults[key] = {"x": x, "y": y, "scale": scale, "updated_at": datetime.now().isoformat()}
|
||
save_asset_defaults(all_defaults)
|
||
return jsonify({"ok": True, "key": key, "x": x, "y": y, "scale": scale})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/config/gemini", methods=["GET"])
|
||
def gemini_config_get():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
cfg = load_runtime_config()
|
||
key = (cfg.get("gemini_api_key") or "").strip()
|
||
masked = ("*" * max(0, len(key) - 4)) + key[-4:] if key else ""
|
||
return jsonify({
|
||
"ok": True,
|
||
"has_api_key": bool(key),
|
||
"api_key_masked": masked,
|
||
"gemini_model": _normalize_user_model(cfg.get("gemini_model") or "nanobanana-pro"),
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/config/gemini", methods=["POST"])
|
||
def gemini_config_set():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
api_key = (data.get("api_key") or "").strip()
|
||
model = _normalize_user_model((data.get("model") or "").strip() or "nanobanana-pro")
|
||
payload = {"gemini_model": model}
|
||
if api_key:
|
||
payload["gemini_api_key"] = api_key
|
||
save_runtime_config(payload)
|
||
return jsonify({"ok": True, "msg": "Gemini 配置已保存"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/restore-default", methods=["POST"])
|
||
def assets_restore_default():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
rel_path = (data.get("path") or "").strip().lstrip("/")
|
||
if not rel_path:
|
||
return jsonify({"ok": False, "msg": "缺少 path"}), 400
|
||
|
||
target = (FRONTEND_PATH / rel_path).resolve()
|
||
try:
|
||
target.relative_to(FRONTEND_PATH.resolve())
|
||
except Exception:
|
||
return jsonify({"ok": False, "msg": "非法 path"}), 400
|
||
|
||
if not target.exists():
|
||
return jsonify({"ok": False, "msg": "目标文件不存在"}), 404
|
||
|
||
root, ext = os.path.splitext(str(target))
|
||
default_path = root + ext + ".default"
|
||
if not os.path.exists(default_path):
|
||
return jsonify({"ok": False, "msg": "未找到默认资产快照"}), 404
|
||
|
||
# 回滚前保留上一版
|
||
bak = str(target) + ".bak"
|
||
if os.path.exists(str(target)):
|
||
shutil.copy2(str(target), bak)
|
||
|
||
shutil.copy2(default_path, str(target))
|
||
st = os.stat(str(target))
|
||
return jsonify({"ok": True, "path": rel_path, "size": st.st_size, "msg": "已重置为默认资产"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/restore-prev", methods=["POST"])
|
||
def assets_restore_prev():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
rel_path = (data.get("path") or "").strip().lstrip("/")
|
||
if not rel_path:
|
||
return jsonify({"ok": False, "msg": "缺少 path"}), 400
|
||
|
||
target = (FRONTEND_PATH / rel_path).resolve()
|
||
try:
|
||
target.relative_to(FRONTEND_PATH.resolve())
|
||
except Exception:
|
||
return jsonify({"ok": False, "msg": "非法 path"}), 400
|
||
|
||
bak = str(target) + ".bak"
|
||
if not os.path.exists(bak):
|
||
return jsonify({"ok": False, "msg": "未找到上一版备份"}), 404
|
||
|
||
shutil.copy2(str(target), bak + ".tmp") if os.path.exists(str(target)) else None
|
||
shutil.copy2(bak, str(target))
|
||
st = os.stat(str(target))
|
||
return jsonify({"ok": True, "path": rel_path, "size": st.st_size, "msg": "已回退到上一版"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
@app.route("/assets/upload", methods=["POST"])
|
||
def assets_upload():
|
||
guard = _require_asset_editor_auth()
|
||
if guard:
|
||
return guard
|
||
try:
|
||
rel_path = (request.form.get("path") or "").strip().lstrip("/")
|
||
backup = (request.form.get("backup") or "1").strip() != "0"
|
||
f = request.files.get("file")
|
||
|
||
if not rel_path or f is None:
|
||
return jsonify({"ok": False, "msg": "缺少 path 或 file"}), 400
|
||
|
||
target = (FRONTEND_PATH / rel_path).resolve()
|
||
try:
|
||
target.relative_to(FRONTEND_PATH.resolve())
|
||
except Exception:
|
||
return jsonify({"ok": False, "msg": "非法 path"}), 400
|
||
|
||
if target.suffix.lower() not in ASSET_ALLOWED_EXTS:
|
||
return jsonify({"ok": False, "msg": "仅允许上传图片/美术资源类型"}), 400
|
||
|
||
if not target.exists():
|
||
return jsonify({"ok": False, "msg": "目标文件不存在,请先从 /assets/list 选择 path"}), 404
|
||
|
||
target.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 首次上传前固化默认资产快照,供“重置为默认资产”使用
|
||
default_snap = Path(str(target) + ".default")
|
||
if not default_snap.exists():
|
||
try:
|
||
shutil.copy2(target, default_snap)
|
||
except Exception:
|
||
pass
|
||
|
||
if backup:
|
||
bak = target.with_suffix(target.suffix + ".bak")
|
||
shutil.copy2(target, bak)
|
||
|
||
auto_sheet = (request.form.get("auto_spritesheet") or "0").strip() == "1"
|
||
ext_name = (f.filename or "").lower()
|
||
|
||
if auto_sheet and target.suffix.lower() in {".webp", ".png"}:
|
||
with tempfile.NamedTemporaryFile(suffix=os.path.splitext(ext_name)[1] or ".gif", delete=False) as tf:
|
||
src_path = tf.name
|
||
f.save(src_path)
|
||
try:
|
||
in_w, in_h = _probe_animated_frame_size(src_path)
|
||
frame_w = int(request.form.get("frame_w") or (in_w or 64))
|
||
frame_h = int(request.form.get("frame_h") or (in_h or 64))
|
||
|
||
# 如果是静态图上传到精灵表目标,按网格切片而不是整图覆盖
|
||
if not (ext_name.endswith(".gif") or ext_name.endswith(".webp")) and Image is not None:
|
||
try:
|
||
with Image.open(src_path) as sim:
|
||
sim = sim.convert("RGBA")
|
||
sw, sh = sim.size
|
||
if frame_w <= 0 or frame_h <= 0:
|
||
frame_w, frame_h = sw, sh
|
||
cols = max(1, sw // frame_w)
|
||
rows = max(1, sh // frame_h)
|
||
sheet_w = cols * frame_w
|
||
sheet_h = rows * frame_h
|
||
if sheet_w <= 0 or sheet_h <= 0:
|
||
raise RuntimeError("静态图尺寸与帧规格不匹配")
|
||
|
||
cropped = sim.crop((0, 0, sheet_w, sheet_h))
|
||
# 目标是 webp 仍按无损保存,避免像素损失
|
||
if target.suffix.lower() == ".webp":
|
||
cropped.save(str(target), "WEBP", lossless=True, quality=100, method=6)
|
||
else:
|
||
cropped.save(str(target), "PNG")
|
||
|
||
st = target.stat()
|
||
return jsonify({
|
||
"ok": True,
|
||
"path": rel_path,
|
||
"size": st.st_size,
|
||
"backup": backup,
|
||
"converted": {
|
||
"from": ext_name.split(".")[-1] if "." in ext_name else "image",
|
||
"to": "webp_spritesheet" if target.suffix.lower() == ".webp" else "png_spritesheet",
|
||
"frame_w": frame_w,
|
||
"frame_h": frame_h,
|
||
"columns": cols,
|
||
"rows": rows,
|
||
"frames": cols * rows,
|
||
"preserve_original": False,
|
||
"pixel_art": True,
|
||
}
|
||
})
|
||
finally:
|
||
pass
|
||
|
||
# 默认:优先保留输入帧尺寸;若前端传了强制值则按前端。
|
||
preserve_original_val = request.form.get("preserve_original")
|
||
if preserve_original_val is None:
|
||
preserve_original = True
|
||
else:
|
||
preserve_original = preserve_original_val.strip() == "1"
|
||
|
||
pixel_art = (request.form.get("pixel_art") or "1").strip() == "1"
|
||
req_cols = int(request.form.get("cols") or 0)
|
||
req_rows = int(request.form.get("rows") or 0)
|
||
sheet_path, cols, rows, frames, out_fw, out_fh = _animated_to_spritesheet(
|
||
src_path,
|
||
frame_w,
|
||
frame_h,
|
||
out_ext=target.suffix.lower(),
|
||
preserve_original=preserve_original,
|
||
pixel_art=pixel_art,
|
||
cols=(req_cols if req_cols > 0 else None),
|
||
rows=(req_rows if req_rows > 0 else None),
|
||
)
|
||
shutil.move(sheet_path, str(target))
|
||
st = target.stat()
|
||
from_type = "gif" if ext_name.endswith(".gif") else "webp"
|
||
to_type = "webp_spritesheet" if target.suffix.lower() == ".webp" else "png_spritesheet"
|
||
return jsonify({
|
||
"ok": True,
|
||
"path": rel_path,
|
||
"size": st.st_size,
|
||
"backup": backup,
|
||
"converted": {
|
||
"from": from_type,
|
||
"to": to_type,
|
||
"frame_w": out_fw,
|
||
"frame_h": out_fh,
|
||
"columns": cols,
|
||
"rows": rows,
|
||
"frames": frames,
|
||
"preserve_original": preserve_original,
|
||
"pixel_art": pixel_art,
|
||
}
|
||
})
|
||
finally:
|
||
try:
|
||
os.remove(src_path)
|
||
except Exception:
|
||
pass
|
||
|
||
f.save(str(target))
|
||
st = target.stat()
|
||
return jsonify({"ok": True, "path": rel_path, "size": st.st_size, "backup": backup})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "msg": str(e)}), 500
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raw_port = os.environ.get("STAR_BACKEND_PORT", "19000")
|
||
try:
|
||
backend_port = int(raw_port)
|
||
except ValueError:
|
||
backend_port = 19000
|
||
if backend_port <= 0:
|
||
backend_port = 19000
|
||
|
||
print("=" * 50)
|
||
print("Star Office UI - Backend State Service")
|
||
print("=" * 50)
|
||
print(f"State file: {STATE_FILE}")
|
||
print(f"Listening on: http://0.0.0.0:{backend_port}")
|
||
if backend_port != 19000:
|
||
print(f"(Port override: set STAR_BACKEND_PORT to change; current: {raw_port})")
|
||
else:
|
||
print("(Set STAR_BACKEND_PORT to use a different port, e.g. 3009)")
|
||
mode = "production" if is_production_mode() else "development"
|
||
print(f"Mode: {mode}")
|
||
if is_production_mode():
|
||
print("Security hardening: ENABLED (strict checks)")
|
||
else:
|
||
weak_flags = []
|
||
if not is_strong_secret(str(app.secret_key)):
|
||
weak_flags.append("weak FLASK_SECRET_KEY/STAR_OFFICE_SECRET")
|
||
if not is_strong_drawer_pass(ASSET_DRAWER_PASS_DEFAULT):
|
||
weak_flags.append("weak ASSET_DRAWER_PASS")
|
||
if weak_flags:
|
||
print("Security hardening: WARNING (dev mode) -> " + ", ".join(weak_flags))
|
||
else:
|
||
print("Security hardening: OK")
|
||
print("=" * 50)
|
||
|
||
app.run(host="0.0.0.0", port=backend_port, debug=False)
|
||
|