unsloth/studio/backend/routes/export.py
Daniel Han 7252410ccc
studio: stream export worker output into the export dialog (#4897)
* studio: stream export worker output into the export dialog

The Export Model dialog only showed a spinner on the "Exporting..."
button while the worker subprocess was doing the actual heavy lifting.
For Merged to 16bit and GGUF / Llama.cpp exports this meant several
minutes (or more, for large models) of opaque silence, with no way to
tell whether save_pretrained_merged, convert_hf_to_gguf.py, or
llama-quantize was making progress.

This adds a live terminal-style output panel inside the export dialog,
rendered just above the Cancel / Start Export buttons and scrollable
with auto-follow-tail. It shows stdout and stderr from both the worker
process itself and any child process it spawns (GGUF converter,
llama-quantize), coloured by stream.

Backend

- core/export/worker.py: new _setup_log_capture(resp_queue) installed
  before LogConfig.setup_logging. It saves the original stdout/stderr
  fds, creates pipes, os.dup2's the write ends onto fds 1 and 2 (so
  every child process inherits the redirected fds), and spins up two
  daemon reader threads. Each thread reads bytes from a pipe, echoes
  them back to the original fd (so the server console keeps working),
  splits on \n and \r, and forwards each line to the resp queue as
  {"type":"log","stream":"stdout|stderr","line":...,"ts":...}.
  PYTHONUNBUFFERED=1 is set so nested Python converters flush
  immediately.

- core/export/orchestrator.py:
  - Thread-safe ring buffer (collections.deque, maxlen 4000) with a
    monotonically increasing seq counter. clear_logs(),
    get_logs_since(cursor), get_current_log_seq(), is_export_active().
  - _wait_response handles rtype == "log" by appending to the buffer
    and continuing the wait loop. Status messages are also surfaced as
    a "status" stream so users see high level progress alongside raw
    subprocess output.
  - load_checkpoint, _run_export, and cleanup_memory now wrap their
    bodies with the existing self._lock (previously unused), clear the
    log buffer at the start of each op, and flip _export_active in a
    try/finally so the SSE endpoint can detect idle.

- routes/export.py:
  - Wrapped every sync orchestrator call (load_checkpoint,
    cleanup_memory, export_merged_model, export_base_model,
    export_gguf, export_lora_adapter) in asyncio.to_thread so the
    FastAPI event loop stays free during long exports. Without this
    the new SSE endpoint could not be served concurrently with the
    blocking export POST.
  - New GET /api/export/logs/stream SSE endpoint. Honors
    Last-Event-ID and a since query param for reconnect, emits log /
    heartbeat / complete / error events, uses the id field to carry
    the log seq so clients can resume cleanly. On first connect
    without an explicit cursor it starts from the current seq so old
    lines from a previous run are not replayed.

Frontend

- features/export/api/export-api.ts: streamExportLogs() helper that
  authFetches the SSE endpoint and parses id / event / data fields
  manually (same pattern as streamTrainingProgress in train-api.ts).

- features/export/components/export-dialog.tsx:
  - Local useExportLogs(exporting) hook that opens the SSE stream on
    exporting transitions to true, accumulates up to 4000 lines in
    component state, and aborts on cleanup.
  - New scrollable output panel rendered above DialogFooter, only
    shown for Merged to 16bit and GGUF / Llama.cpp (LoRA adapter is
    a fast disk write with nothing to show). Dark terminal styling
    (bg-black/85, emerald text, rose for stderr, sky for status),
    max-height 14rem, auto-scrolls to the bottom on new output but
    stops following if the user scrolls up. A small streaming / idle
    indicator is shown next to the panel title.
  - DialogContent widens from sm:max-w-lg to sm:max-w-2xl when the
    output panel is visible so the logs have room to breathe.

Verified

- Python smoke test (tests/smoke_export_log_capture.py): spawns a
  real mp.get_context("spawn") process, installs _setup_log_capture,
  confirms that parent stdout prints, parent stderr prints, AND a
  child subprocess invoked via subprocess.run (both its stdout and
  stderr) are all captured in the resp queue. Passes.
- Orchestrator log helpers tested in isolation: _append_log,
  get_logs_since (with and without a cursor), clear_logs not
  resetting seq so reconnecting clients still progress. Passes.
- routes.export imports cleanly in the studio venv and /logs/stream
  shows up in router.routes.
- bun run build: tsc -b plus vite build, no TypeScript errors.

No existing export behavior is changed. If the subprocess, the SSE
endpoint, or the frontend hook fails, the export itself still runs to
completion the same way it did before, with or without logs visible.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* export dialog: trim bootstrap noise, scope logs per screen, show realpath

Several follow-ups to the live export log work:

1. Worker bootstrap noise (transformers venv activation, Unsloth banner,
   "Top GGUF/hub models" lists, vision detection, 2k-step weight load
   bar) is dropped from the export-dialog stream. A threading.Event
   gate in worker.py defaults closed and only opens once _handle_export
   actually starts; until then the reader thread still echoes lines to
   the saved console fd for debugging but does not push them onto the
   resp_queue. The orchestrator already spawns a fresh subprocess for
   every checkpoint load, so the gate is naturally reset between runs.

2. tqdm in non-tty mode defaults to a 10s mininterval, which makes
   multi-step bars look frozen in the panel. Set TQDM_MININTERVAL=0.5
   in the worker env so any tqdm-driven progress emits more often.

3. The dialog's useExportLogs hook now also clears its line buffer
   when exportMethod or open changes, so re-opening the dialog into a
   different action's screen no longer shows the previous action's
   saved output. A useElapsedSeconds tick + "Working Xs" badge in the
   log header gives users a visible sign that long single-step phases
   (cache copies, GGUF conversion) are still running when no new lines
   are arriving.

4. ExportBackend.export_{merged,base,gguf,lora} now return
   (success, message, output_path); the worker forwards output_path on
   each export_*_done response, the orchestrator's _run_export passes
   it to routes/export.py, which surfaces it via
   ExportOperationResponse.details.output_path. The dialog's Export
   Complete screen renders the resolved on-disk realpath under "Saved
   to" so users can find their exported model directly.

* fix(cli): unpack 3-tuple return from export backend

ExportOrchestrator.export_{merged,base,gguf,lora} now return
(success, message, output_path) so the studio dialog can show
the on-disk realpath. The CLI still unpacked 2 values, so every
`unsloth export --format ...` crashed with ValueError before
reporting completion. Update the four call sites and surface
output_path via a "Saved to:" echo.

* fix(studio): anchor export log SSE cursor at run start

The export dialog SSE defaulted its cursor to get_current_log_seq()
at connect time, so any line emitted between the POST that kicks
off the export and the client opening the stream was buffered with
seqs 1..k and then skipped (seq <= cursor). Long-running exports
looked silent during their first seconds.

Snapshot _log_seq into _run_start_seq inside clear_logs() and
expose it via get_run_start_seq(). The SSE default cursor now uses
that snapshot, so every line emitted since the current run began
is reachable regardless of when the client connects. Old runs
still can't leak in because their seqs are <= the snapshot.

* fix(studio): reconnect export log SSE on stream drop

useExportLogs launched streamExportLogs once per exporting
transition and recorded any drop in .catch(). Long GGUF exports
behind a proxy with an idle kill-timeout would silently lose the
stream for the rest of the run even though the backend already
supports Last-Event-ID resume. The "retry: 3000" directive emitted
by the backend is only meaningful to native EventSource; this
hook uses a manual fetch + ReadableStream parse so it had no
effect.

Wrap streamExportLogs in a retry loop that tracks lastSeq from
ExportLogEvent.id and passes it as since on reconnect. Backoff is
exponential with jitter, capped at 5s, reset on successful open.
The loop stops on explicit backend `complete` event or on effect
cleanup.

* fix(studio): register a second command so Typer keeps `export` as a subcommand

The CLI export unpacking tests wrap `unsloth_cli.commands.export.export`
in a fresh Typer app with a single registered command. Typer flattens a
single-command app into that command, so the test's
`runner.invoke(cli_app, ["export", ckpt, out, ...])` treats the leading
`"export"` token as an unexpected extra positional argument -- every
parametrized case failed with:

    Got unexpected extra argument (.../out)

Register a harmless `noop` second command so Typer preserves subcommand
routing and the tests actually exercise the 3-tuple unpack path they
were written to guard.

Before: 4 failed
After:  4 passed

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: studio-install <studio@local.install>
Co-authored-by: Roland Tannous <115670425+rolandtannous@users.noreply.github.com>
Co-authored-by: Lee Jackson <130007945+Imagineer99@users.noreply.github.com>
Co-authored-by: Roland Tannous <rolandtannous@gravityq.ai>
2026-04-14 08:55:43 -07:00

504 lines
18 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-only
# Copyright 2026-present the Unsloth AI Inc. team. All rights reserved. See /studio/LICENSE.AGPL-3.0
"""
Export API routes: checkpoint discovery and model export operations.
"""
import asyncio
import json
import sys
import time
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
import structlog
from loggers import get_logger
# Add backend directory to path
backend_path = Path(__file__).parent.parent.parent
if str(backend_path) not in sys.path:
sys.path.insert(0, str(backend_path))
# Auth
from auth.authentication import get_current_subject
# Import backend functions
try:
from core.export import get_export_backend
except ImportError:
parent_backend = backend_path.parent / "backend"
if str(parent_backend) not in sys.path:
sys.path.insert(0, str(parent_backend))
from core.export import get_export_backend
# Import Pydantic models
from models import (
LoadCheckpointRequest,
ExportStatusResponse,
ExportOperationResponse,
ExportMergedModelRequest,
ExportBaseModelRequest,
ExportGGUFRequest,
ExportLoRAAdapterRequest,
)
router = APIRouter()
logger = get_logger(__name__)
@router.post("/load-checkpoint", response_model = ExportOperationResponse)
async def load_checkpoint(
request: LoadCheckpointRequest,
current_subject: str = Depends(get_current_subject),
):
"""
Load a checkpoint into the export backend.
Wraps ExportBackend.load_checkpoint.
"""
try:
# Version switching is handled automatically by the subprocess-based
# export backend — no need for ensure_transformers_version() here.
# Free GPU memory: shut down any running inference/training subprocesses
# before loading the export checkpoint (they'd compete for VRAM).
try:
from core.inference import get_inference_backend
inf = get_inference_backend()
if inf.active_model_name:
logger.info(
"Unloading inference model '%s' to free GPU memory for export",
inf.active_model_name,
)
inf._shutdown_subprocess()
inf.active_model_name = None
inf.models.clear()
except Exception as e:
logger.warning("Could not unload inference model: %s", e)
try:
from core.training import get_training_backend
trn = get_training_backend()
if trn.is_training_active():
logger.info("Stopping active training to free GPU memory for export")
trn.stop_training()
# Wait for training subprocess to actually exit before proceeding,
# otherwise it may still hold GPU memory when export tries to load.
for _ in range(60): # up to 30s
if not trn.is_training_active():
break
import time
time.sleep(0.5)
else:
logger.warning(
"Training subprocess did not exit within 30s, proceeding anyway"
)
except Exception as e:
logger.warning("Could not stop training: %s", e)
backend = get_export_backend()
# load_checkpoint spawns and waits on a subprocess and can take
# minutes. Run it in a worker thread so the event loop stays
# free to serve the live log SSE stream concurrently.
success, message = await asyncio.to_thread(
backend.load_checkpoint,
checkpoint_path = request.checkpoint_path,
max_seq_length = request.max_seq_length,
load_in_4bit = request.load_in_4bit,
trust_remote_code = request.trust_remote_code,
)
if not success:
raise HTTPException(status_code = 400, detail = message)
return ExportOperationResponse(success = True, message = message)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error loading checkpoint: {e}", exc_info = True)
raise HTTPException(
status_code = 500,
detail = f"Failed to load checkpoint: {str(e)}",
)
@router.post("/cleanup", response_model = ExportOperationResponse)
async def cleanup_export_memory(
current_subject: str = Depends(get_current_subject),
):
"""
Cleanup export-related models from memory (GPU/CPU).
Wraps ExportBackend.cleanup_memory.
"""
try:
backend = get_export_backend()
success = await asyncio.to_thread(backend.cleanup_memory)
if not success:
raise HTTPException(
status_code = 500,
detail = "Memory cleanup failed. See server logs for details.",
)
return ExportOperationResponse(
success = True,
message = "Memory cleanup completed successfully",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error during export memory cleanup: {e}", exc_info = True)
raise HTTPException(
status_code = 500,
detail = f"Failed to cleanup export memory: {str(e)}",
)
@router.get("/status", response_model = ExportStatusResponse)
async def get_export_status(
current_subject: str = Depends(get_current_subject),
):
"""
Get current export backend status (loaded checkpoint, model type, PEFT flag).
"""
try:
backend = get_export_backend()
return ExportStatusResponse(
current_checkpoint = backend.current_checkpoint,
is_vision = bool(getattr(backend, "is_vision", False)),
is_peft = bool(getattr(backend, "is_peft", False)),
)
except Exception as e:
logger.error(f"Error getting export status: {e}", exc_info = True)
raise HTTPException(
status_code = 500,
detail = f"Failed to get export status: {str(e)}",
)
def _export_details(output_path: Optional[str]) -> Optional[Dict[str, Any]]:
"""Wrap the resolved on-disk export path into the details dict the
frontend reads to populate the Export Complete screen. Returns None
when the export had no local component (Hub-only push) so the
Pydantic field stays absent rather than ``{"output_path": null}``.
"""
if not output_path:
return None
return {"output_path": output_path}
@router.post("/export/merged", response_model = ExportOperationResponse)
async def export_merged_model(
request: ExportMergedModelRequest,
current_subject: str = Depends(get_current_subject),
):
"""
Export a merged PEFT model (e.g., 16-bit or 4-bit) and optionally push to Hub.
Wraps ExportBackend.export_merged_model.
"""
try:
backend = get_export_backend()
success, message, output_path = await asyncio.to_thread(
backend.export_merged_model,
save_directory = request.save_directory,
format_type = request.format_type,
push_to_hub = request.push_to_hub,
repo_id = request.repo_id,
hf_token = request.hf_token,
private = request.private,
)
if not success:
raise HTTPException(status_code = 400, detail = message)
return ExportOperationResponse(
success = True,
message = message,
details = _export_details(output_path),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error exporting merged model: {e}", exc_info = True)
raise HTTPException(
status_code = 500,
detail = f"Failed to export merged model: {str(e)}",
)
@router.post("/export/base", response_model = ExportOperationResponse)
async def export_base_model(
request: ExportBaseModelRequest,
current_subject: str = Depends(get_current_subject),
):
"""
Export a non-PEFT base model and optionally push to Hub.
Wraps ExportBackend.export_base_model.
"""
try:
backend = get_export_backend()
success, message, output_path = await asyncio.to_thread(
backend.export_base_model,
save_directory = request.save_directory,
push_to_hub = request.push_to_hub,
repo_id = request.repo_id,
hf_token = request.hf_token,
private = request.private,
base_model_id = request.base_model_id,
)
if not success:
raise HTTPException(status_code = 400, detail = message)
return ExportOperationResponse(
success = True,
message = message,
details = _export_details(output_path),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error exporting base model: {e}", exc_info = True)
raise HTTPException(
status_code = 500,
detail = f"Failed to export base model: {str(e)}",
)
@router.post("/export/gguf", response_model = ExportOperationResponse)
async def export_gguf(
request: ExportGGUFRequest,
current_subject: str = Depends(get_current_subject),
):
"""
Export the current model to GGUF format and optionally push to Hub.
Wraps ExportBackend.export_gguf.
"""
try:
backend = get_export_backend()
success, message, output_path = await asyncio.to_thread(
backend.export_gguf,
save_directory = request.save_directory,
quantization_method = request.quantization_method,
push_to_hub = request.push_to_hub,
repo_id = request.repo_id,
hf_token = request.hf_token,
)
if not success:
raise HTTPException(status_code = 400, detail = message)
return ExportOperationResponse(
success = True,
message = message,
details = _export_details(output_path),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error exporting GGUF model: {e}", exc_info = True)
raise HTTPException(
status_code = 500,
detail = f"Failed to export GGUF model: {str(e)}",
)
@router.post("/export/lora", response_model = ExportOperationResponse)
async def export_lora_adapter(
request: ExportLoRAAdapterRequest,
current_subject: str = Depends(get_current_subject),
):
"""
Export only the LoRA adapter (if the loaded model is PEFT).
Wraps ExportBackend.export_lora_adapter.
"""
try:
backend = get_export_backend()
success, message, output_path = await asyncio.to_thread(
backend.export_lora_adapter,
save_directory = request.save_directory,
push_to_hub = request.push_to_hub,
repo_id = request.repo_id,
hf_token = request.hf_token,
private = request.private,
)
if not success:
raise HTTPException(status_code = 400, detail = message)
return ExportOperationResponse(
success = True,
message = message,
details = _export_details(output_path),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error exporting LoRA adapter: {e}", exc_info = True)
raise HTTPException(
status_code = 500,
detail = f"Failed to export LoRA adapter: {str(e)}",
)
# ─────────────────────────────────────────────────────────────────────
# Live export log stream (Server-Sent Events)
# ─────────────────────────────────────────────────────────────────────
#
# The export worker subprocess redirects its stdout/stderr into a pipe
# that a reader thread forwards to the orchestrator as log entries (see
# core/export/worker.py::_setup_log_capture and
# core/export/orchestrator.py::_append_log). This endpoint streams
# those entries to the browser so the export dialog can show a live
# terminal-style output panel while load_checkpoint / export_merged /
# export_gguf / export_lora / export_base run.
#
# Shape follows the training progress SSE endpoint
# (routes/training.py::stream_training_progress): each event carries
# `id`, `event`, and `data` fields, the stream starts with a `retry:`
# directive, and `Last-Event-ID` is honored on reconnect.
def _format_sse(data: str, event: str, event_id: Optional[int] = None) -> str:
"""Format a single SSE message with id/event/data fields."""
lines = []
if event_id is not None:
lines.append(f"id: {event_id}")
lines.append(f"event: {event}")
lines.append(f"data: {data}")
lines.append("")
lines.append("")
return "\n".join(lines)
@router.get("/logs/stream")
async def stream_export_logs(
request: Request,
since: Optional[int] = Query(
None,
description = "Return log entries with seq strictly greater than this cursor.",
),
current_subject: str = Depends(get_current_subject),
):
"""
Stream live stdout/stderr output from the export worker subprocess
as Server-Sent Events.
Events:
- `log` : a single log line (data: {"stream","line","ts"})
- `heartbeat`: periodic keepalive when no new lines are available
- `complete` : emitted once the export worker is idle and no new
lines arrived for ~1 second. Clients should close.
- `error` : unrecoverable server-side error
The `id:` field on each event is the log entry's monotonic seq
number so the browser can resume via `Last-Event-ID` on reconnect.
"""
backend = get_export_backend()
# Determine starting cursor. Explicit `since` wins, then
# Last-Event-ID header on reconnect, otherwise start from the
# run-start snapshot captured by clear_logs() so the client sees
# every line emitted since the current run began -- even if the
# SSE connection opened after the POST that kicked off the export.
# Using get_current_log_seq() here would lose the early bootstrap
# lines that arrive in the gap between POST and SSE connect.
last_event_id = request.headers.get("last-event-id")
if since is None and last_event_id is not None:
try:
since = int(last_event_id)
except ValueError:
pass
if since is None:
cursor = backend.get_run_start_seq()
else:
cursor = max(0, int(since))
async def event_generator() -> AsyncGenerator[str, None]:
nonlocal cursor
# Tell the browser to reconnect after 3 seconds if the
# connection drops mid-export.
yield "retry: 3000\n\n"
last_yield = time.monotonic()
idle_since: Optional[float] = None
try:
while True:
if await request.is_disconnected():
return
entries, new_cursor = backend.get_logs_since(cursor)
if entries:
for entry in entries:
payload = json.dumps(
{
"stream": entry.get("stream", "stdout"),
"line": entry.get("line", ""),
"ts": entry.get("ts"),
}
)
yield _format_sse(
payload,
event = "log",
event_id = int(entry.get("seq", 0)),
)
cursor = new_cursor
last_yield = time.monotonic()
idle_since = None
else:
now = time.monotonic()
if now - last_yield > 10.0:
yield _format_sse("{}", event = "heartbeat")
last_yield = now
if not backend.is_export_active():
# Give the reader thread a moment to drain any
# trailing lines the worker process printed
# just before signalling done.
if idle_since is None:
idle_since = now
elif now - idle_since > 1.0:
yield _format_sse(
"{}",
event = "complete",
event_id = cursor,
)
return
else:
idle_since = None
await asyncio.sleep(0.1)
except asyncio.CancelledError:
# Client disconnected mid-yield. Don't re-raise, just end
# the generator cleanly so StreamingResponse finalizes.
return
except Exception as exc:
logger.error("Export log stream failed: %s", exc, exc_info = True)
try:
yield _format_sse(
json.dumps({"error": str(exc)}),
event = "error",
)
except Exception:
pass
return StreamingResponse(
event_generator(),
media_type = "text/event-stream",
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)