Studio: forward standard OpenAI tools / tool_choice on /v1/responses (Codex compat) (#5122)

* Studio: forward standard OpenAI tools / tool_choice on /v1/responses

Mirrors the /v1/chat/completions client-side tool pass-through from #5099
so clients (OpenAI Codex CLI, OpenAI Python SDK, ...) that target the
Responses API receive structured function_call output items instead of
plain text with tool-call tokens leaking into content.

- ResponsesRequest: type tools/tool_choice properly, add parallel_tool_calls;
  accept function_call and function_call_output input items for multi-turn
- Translate flat Responses tool / tool_choice shape to the nested Chat
  Completions shape before forwarding to llama-server
- _normalise_responses_input: map function_call_output -> role="tool",
  function_call -> assistant tool_calls (preserving call_id)
- Non-streaming: map returned tool_calls -> top-level function_call
  output items keyed by call_id
- Streaming: emit response.output_item.added (function_call),
  response.function_call_arguments.delta/.done, and response.output_item.done
  per tool call while keeping the text message at output_index 0
- Pytest coverage: tools/tool_choice translation, multi-turn input mapping,
  non-streaming tool_calls mapping, response round-trip

* Studio: merge system messages and close inner stream on /v1/responses

Fixes two issues surfacing when OpenAI Codex CLI drives /v1/responses
against a GGUF with a strict chat template (gpt-oss harmony, Qwen3, ...).

1. "System message must be at the beginning" upstream errors
   Codex sends `instructions` AND a `role:"developer"` message in `input`,
   producing two separate system-role messages. Strict templates raise
   when a second system message exists or when one appears after a user
   turn. _normalise_responses_input now hoists all instructions / system /
   developer content into a single merged system message at the top of
   the Chat Completions message list.

2. "async generator ignored GeneratorExit" / "Attempted to exit cancel
   scope in a different task"
   _responses_stream consumed the inner chat-completions body_iterator
   without an explicit aclose() in a finally block. On client disconnect
   (Codex frequently cancels mid-stream), Python 3.13 finalized the inner
   async generator on a different task, tripping anyio's cancel-scope
   check. Mirrored the same try/finally + aclose pattern used by the
   /v1/messages, /v1/chat/completions, and /v1/completions passthroughs.

Tests: hoisting of instructions + developer, developer mid-conversation,
multiple system messages in input, no-system passthrough.

* Studio: accept Codex multi-turn shapes and fix cross-task stream close on /v1/responses

Two issues observed driving /v1/responses from OpenAI Codex CLI against a
GGUF backend.

1. 422 on every turn after the first
   Codex replays prior assistant turns with
   `content:[{"type":"output_text","text":...,"annotations":[],"logprobs":[]}]`
   and carries forward `reasoning` items (o-series / gpt-5) between turns.
   Our `ResponsesContentPart` union only accepted input_text / input_image,
   and `ResponsesInputItem` only message / function_call / function_call_output,
   so Pydantic failed the whole list and FastAPI returned
   `"Input should be a valid string"` against the `str` branch of the
   outer union.

   - Add `ResponsesOutputTextPart` for assistant-replay content.
   - Add `ResponsesUnknownContentPart` and `ResponsesUnknownInputItem`
     as permissive catch-alls (drop during normalisation).
   - Wire an explicit `Discriminator` so dispatch is deterministic and
     the fallthrough reaches the catch-all instead of misreporting via
     the outer `Union[str, list[...]]`.
   - `_normalise_responses_input` now accepts output_text parts, flattens
     single-part assistant text to a plain string (keeps legacy chat
     templates happy), and silently drops reasoning / unknown items.

2. "async generator ignored GeneratorExit" / cross-task cancel scope
   `_responses_stream` awaited `openai_chat_completions` in the parent
   route-handler task, which opens the httpx client for the inner
   passthrough on *that* task. The outer `StreamingResponse` then iterates
   in a child task, so the asyncgen GC finalises the inner httpcore byte
   stream on the child task, tripping anyio's "Attempted to exit cancel
   scope in a different task". Move the `await` inside `event_generator`
   so the httpx lifecycle stays within the single streaming child task,
   and surface any HTTPException as a `response.failed` SSE frame.

Tests: assistant output_text replay, reasoning-item tolerance, unknown
content-part tolerance, end-to-end Codex-shape payload (developer + user +
reasoning + function_call + function_call_output + assistant output_text +
user), and single-part assistant flattening to plain string.

* Studio: call llama-server directly from streaming /v1/responses

The previous fix (running the inner await inside event_generator) was not
enough. Wrapping the existing `openai_chat_completions` pass-through still
stacks two async generators: when the outer generator is closed, the
innermost `HTTP11ConnectionByteStream.__aiter__` in httpcore doesn't
receive GeneratorExit before Python's asyncgen GC finalises it in a
sibling task, tripping "Attempted to exit cancel scope in a different
task" and "async generator ignored GeneratorExit" — the same Python 3.13
+ httpcore 1.0.x interaction already seen in PRs #4956, #4981, #5099.

Cure both pass-throughs had: a single same-task httpx lifecycle with
explicit `aiter_lines().aclose()` BEFORE `resp.aclose()` / `client.aclose()`
in the generator's finally block.

Apply it at the Responses layer by dropping the wrapper entirely for GGUF:
open httpx, consume `resp.aiter_lines()`, parse `chat.completion.chunk`,
emit Responses SSE events, close everything in finally — all in the
single StreamingResponse child task. Non-GGUF streaming is rejected with
a 400 (wrapping the transformers backend would re-introduce the
double-layer pattern and isn't a Codex-compatible path today anyway).

Also surfaces upstream httpx.RequestError / non-200 as a
`response.failed` SSE frame rather than a dropped stream now that the
request is dispatched after SSE headers have gone out.

* Studio: silence benign httpcore asyncgen GC warnings on Python 3.13

The streaming pass-throughs (/v1/chat/completions, /v1/messages,
/v1/responses, /v1/completions) all use the proven #4981 / #5099 pattern
— single-task httpx lifecycle with explicit aiter_lines().aclose() ahead
of resp.aclose() / client.aclose() in the generator's finally block.
That handles our own iterators correctly.

The residual noise ("async generator ignored GeneratorExit" /
"Attempted to exit cancel scope in a different task") comes from an
innermost HTTP11ConnectionByteStream.__aiter__ that httpcore creates
internally inside its pool. We hold no reference to it, so we cannot
aclose it ourselves. Python 3.13's asyncgen GC hook finalises it on the
finaliser task, its aclose path enters an anyio CancelScope shield, and
Python flags the cross-task exit. The response has already been
delivered with a 200 by then — it is purely log noise, not a functional
failure. Same interaction seen in modelcontextprotocol/python-sdk #831,
agno #3556, chainlit #2361, langchain-mcp-adapters #254.

Install a targeted sys.unraisablehook that swallows this specific tuple
— RuntimeError mentioning "cancel scope" or "GeneratorExit" plus an
object repr referencing HTTP11ConnectionByteStream — and defers to the
default hook for every other unraisable. Idempotent; guarded by a
sentinel attribute so repeated imports don't stack filters.
This commit is contained in:
Roland Tannous 2026-04-21 13:17:20 +04:00 committed by GitHub
parent c20959dbf4
commit 21e9a91a57
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 1469 additions and 108 deletions

View file

@ -599,23 +599,172 @@ class ResponsesInputImagePart(BaseModel):
detail: Optional[Literal["auto", "low", "high"]] = "auto"
ResponsesContentPart = Union[ResponsesInputTextPart, ResponsesInputImagePart]
class ResponsesOutputTextPart(BaseModel):
"""Assistant ``output_text`` content part replayed on subsequent turns.
When a client (OpenAI Codex CLI, OpenAI Python SDK agents) loops on a
stateless Responses endpoint, prior assistant messages are round-tripped
as ``{"role":"assistant","content":[{"type":"output_text","text":...,
"annotations":[],"logprobs":[]}]}``. We preserve the text and ignore
the annotations/logprobs metadata when flattening into Chat Completions.
"""
type: Literal["output_text"]
text: str
annotations: Optional[list] = None
logprobs: Optional[list] = None
model_config = {"extra": "allow"}
class ResponsesUnknownContentPart(BaseModel):
"""Catch-all for content-part types we don't model explicitly.
Keeps validation green when a client sends newer part types (e.g.
``input_audio``, ``input_file``) we haven't mapped; these are silently
skipped during normalisation rather than rejected with a 422.
"""
type: str
model_config = {"extra": "allow"}
ResponsesContentPart = Union[
ResponsesInputTextPart,
ResponsesInputImagePart,
ResponsesOutputTextPart,
ResponsesUnknownContentPart,
]
class ResponsesInputMessage(BaseModel):
"""A single message in the Responses API input array."""
type: Optional[Literal["message"]] = None
role: Literal["system", "user", "assistant", "developer"]
content: Union[str, list[ResponsesContentPart]]
# Codex (gpt-5.3-codex+) attaches a `phase` field ("commentary" |
# "final_answer") to assistant messages and requires clients to preserve
# it on subsequent turns. We accept and round-trip it; llama-server does
# not care about it.
model_config = {"extra": "allow"}
class ResponsesFunctionCallInputItem(BaseModel):
"""A prior assistant function_call being replayed in a multi-turn Responses input.
The Responses API represents tool calls as top-level input items (not
nested inside assistant messages), correlated across turns by ``call_id``.
"""
type: Literal["function_call"]
id: Optional[str] = Field(
None, description = "Item id assigned by the server (e.g. fc_...)"
)
call_id: str = Field(
...,
description = "Correlation id matching a function_call_output on the next turn.",
)
name: str
arguments: str = Field(
..., description = "JSON string of the arguments the model produced."
)
status: Optional[Literal["in_progress", "completed", "incomplete"]] = None
class ResponsesFunctionCallOutputInputItem(BaseModel):
"""A tool result supplied by the client for a prior function_call.
Replaces Chat Completions' ``role="tool"`` message. Correlated to the
originating call by ``call_id``.
"""
type: Literal["function_call_output"]
id: Optional[str] = None
call_id: str
output: Union[str, list] = Field(
..., description = "String or content-array result of the tool call."
)
status: Optional[Literal["in_progress", "completed", "incomplete"]] = None
class ResponsesUnknownInputItem(BaseModel):
"""Catch-all for Responses input item types we don't model explicitly.
Covers ``reasoning`` items (replayed from prior o-series / gpt-5 turns)
and any future item types the client may send. These items are dropped
during normalisation llama-server-backed GGUFs cannot consume them
but keeping them in the request-model union stops unrelated turns from
failing validation with a 422.
"""
type: str
model_config = {"extra": "allow"}
def _responses_input_item_discriminator(v: Any) -> str:
"""Route a Responses input item to the correct tagged variant.
Pydantic's default smart-union matching fails when one variant in the
union is tagged with a strict ``Literal`` (``function_call`` /
``function_call_output``) and the incoming dict uses a different
``type`` the other variants' validation errors are hidden and the
outer ``Union[str, list[...]]`` reports a misleading "Input should be a
valid string" error. An explicit discriminator makes the routing
deterministic and lets us fall through to the catch-all.
"""
if isinstance(v, dict):
t = v.get("type")
r = v.get("role")
else:
t = getattr(v, "type", None)
r = getattr(v, "role", None)
if t == "function_call":
return "function_call"
if t == "function_call_output":
return "function_call_output"
if r is not None or t == "message":
return "message"
return "unknown"
ResponsesInputItem = Annotated[
Union[
Annotated[ResponsesInputMessage, Tag("message")],
Annotated[ResponsesFunctionCallInputItem, Tag("function_call")],
Annotated[ResponsesFunctionCallOutputInputItem, Tag("function_call_output")],
Annotated[ResponsesUnknownInputItem, Tag("unknown")],
],
Discriminator(_responses_input_item_discriminator),
]
class ResponsesFunctionTool(BaseModel):
"""Flat function-tool definition used by the Responses API request.
Unlike Chat Completions (which nests ``{"name": ..., "parameters": ...}``
inside a ``"function"`` key), the Responses API uses a flat shape with
``type``, ``name``, ``description``, ``parameters``, and ``strict`` at the
top level of each tool entry.
"""
type: Literal["function"]
name: str
description: Optional[str] = None
parameters: Optional[dict] = None
strict: Optional[bool] = None
class ResponsesRequest(BaseModel):
"""OpenAI Responses API request."""
model: str = Field("default", description = "Model identifier")
input: Union[str, list[ResponsesInputMessage]] = Field(
input: Union[str, list[ResponsesInputItem]] = Field(
default = [],
description = "Input text or message list",
description = "Input text or list of messages / function_call / function_call_output items",
)
instructions: Optional[str] = Field(
None, description = "System / developer instructions"
@ -625,9 +774,31 @@ class ResponsesRequest(BaseModel):
max_output_tokens: Optional[int] = Field(None, ge = 1)
stream: bool = Field(False, description = "Whether to stream the response via SSE")
# Accepted but ignored -- keeps SDK clients from failing on unsupported fields
tools: Optional[list] = None
tool_choice: Optional[Any] = None
# OpenAI function-calling fields — forwarded to llama-server via the
# Chat Completions pass-through (see routes/inference.py). Typed as a
# plain list so built-in tool shapes (``web_search``, ``file_search``,
# ``mcp``, ...) round-trip without validation errors — the translator
# picks out only ``type=="function"`` entries for forwarding.
tools: Optional[list[dict]] = Field(
None,
description = (
"Responses-shape function tool definitions. Entries with "
'`type="function"` are translated to the Chat Completions nested '
"shape before being forwarded to llama-server; other tool types "
"(built-in web_search, file_search, mcp, ...) are accepted for SDK "
"compatibility but ignored on the llama-server passthrough."
),
)
tool_choice: Optional[Any] = Field(
None,
description = (
"'auto' | 'required' | 'none' | {'type': 'function', 'name': ...} — "
"the Responses-shape forcing object is translated to the Chat "
"Completions nested shape internally."
),
)
parallel_tool_calls: Optional[bool] = None
previous_response_id: Optional[str] = None
store: Optional[bool] = None
metadata: Optional[dict] = None
@ -660,6 +831,28 @@ class ResponsesOutputMessage(BaseModel):
content: list[ResponsesOutputTextContent] = Field(default_factory = list)
class ResponsesOutputFunctionCall(BaseModel):
"""A function-call output item in the Responses API response.
Unlike Chat Completions (which nests tool calls inside the assistant
message), the Responses API emits each tool call as its own top-level
``output`` item so clients can correlate results via ``call_id`` on the
next turn.
"""
type: Literal["function_call"] = "function_call"
id: str = Field(default_factory = lambda: f"fc_{uuid.uuid4().hex[:12]}")
call_id: str
name: str
arguments: str = Field(
..., description = "JSON string of the arguments the model produced."
)
status: Literal["completed", "in_progress", "incomplete"] = "completed"
ResponsesOutputItem = Union[ResponsesOutputMessage, ResponsesOutputFunctionCall]
class ResponsesUsage(BaseModel):
"""Token usage for a Responses API response (input_tokens, not prompt_tokens)."""
@ -676,7 +869,7 @@ class ResponsesResponse(BaseModel):
created_at: int = Field(default_factory = lambda: int(time.time()))
status: Literal["completed", "in_progress", "failed"] = "completed"
model: str = "default"
output: list[ResponsesOutputMessage] = Field(default_factory = list)
output: list[ResponsesOutputItem] = Field(default_factory = list)
usage: ResponsesUsage = Field(default_factory = ResponsesUsage)
error: Optional[Any] = None
incomplete_details: Optional[Any] = None

View file

@ -12,7 +12,7 @@ import uuid
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import StreamingResponse, JSONResponse, Response
from typing import Optional
from typing import Any, Optional, Union
import json
import httpx
import structlog
@ -27,6 +27,58 @@ import re as _re
from utils.models import extract_model_size_b as _extract_model_size_b
def _install_httpcore_asyncgen_silencer() -> None:
"""Silence benign httpx/httpcore asyncgen GC noise on Python 3.13.
When Studio proxies a streaming response from llama-server via httpx,
the innermost ``HTTP11ConnectionByteStream.__aiter__`` async generator
is finalised by Python's asyncgen GC hook on a task different from the
one that opened it. Its ``aclose`` path then calls
``anyio.Lock.acquire`` ``cancel_shielded_checkpoint`` which enters a
``CancelScope`` on the finaliser task Python 3.13 flags the
cross-task exit as ``"Attempted to exit cancel scope in a different
task"`` and prints ``"async generator ignored GeneratorExit"`` as an
unraisable warning.
This is a known httpx + httpcore + anyio interaction (see MCP SDK
python-sdk#831, agno #3556, chainlit #2361, langchain-mcp-adapters
#254). It is benign: the response has already been delivered with a
200. The streaming pass-throughs (``/v1/chat/completions``,
``/v1/messages``, ``/v1/responses``, ``/v1/completions``) already
manage their httpx lifecycle inside a single task with explicit
``aclose()`` of the lines iterator, response, and client; the errant
generator is not one we hold a reference to and therefore cannot
close ourselves.
We install a single process-wide unraisable hook that swallows just
this specific interaction identified by the tuple of (RuntimeError
mentioning cancel scope / GeneratorExit) + (object repr referencing
HTTP11ConnectionByteStream) and defers to the default hook for
everything else. The filter is idempotent.
"""
prior_hook = sys.unraisablehook
if getattr(prior_hook, "_unsloth_httpcore_silencer", False):
return
def _hook(unraisable):
exc_value = getattr(unraisable, "exc_value", None)
obj = getattr(unraisable, "object", None)
obj_repr = repr(obj) if obj is not None else ""
if (
isinstance(exc_value, RuntimeError)
and "HTTP11ConnectionByteStream" in obj_repr
and ("cancel scope" in str(exc_value) or "GeneratorExit" in str(exc_value))
):
return
prior_hook(unraisable)
_hook._unsloth_httpcore_silencer = True # type: ignore[attr-defined]
sys.unraisablehook = _hook
_install_httpcore_asyncgen_silencer()
def _friendly_error(exc: Exception) -> str:
"""Extract a user-friendly message from known llama-server errors."""
# httpx transport-layer failures reaching the managed llama-server —
@ -101,8 +153,14 @@ from models.inference import (
ResponsesInputMessage,
ResponsesInputTextPart,
ResponsesInputImagePart,
ResponsesOutputTextPart,
ResponsesUnknownContentPart,
ResponsesUnknownInputItem,
ResponsesFunctionCallInputItem,
ResponsesFunctionCallOutputInputItem,
ResponsesOutputTextContent,
ResponsesOutputMessage,
ResponsesOutputFunctionCall,
ResponsesUsage,
ResponsesResponse,
AnthropicMessagesRequest,
@ -2083,49 +2141,229 @@ async def openai_embeddings(
# =====================================================================
def _translate_responses_tools_to_chat(
tools: Optional[list[dict]],
) -> Optional[list[dict]]:
"""Translate Responses-shape function tools to the Chat Completions nested shape.
Responses uses a flat shape per tool entry::
{"type": "function", "name": "...", "description": "...",
"parameters": {...}, "strict": true}
The Chat Completions / llama-server passthrough expects the nested shape::
{"type": "function",
"function": {"name": "...", "description": "...",
"parameters": {...}, "strict": true}}
Only ``type=="function"`` entries are forwarded. Built-in Responses tools
(``web_search``, ``file_search``, ``mcp``, ...) are dropped because
llama-server does not implement them server-side; keeping them in the
request would produce an opaque upstream 400.
"""
if not tools:
return None
out: list[dict] = []
for tool in tools:
if not isinstance(tool, dict):
continue
if tool.get("type") != "function":
continue
fn: dict = {}
if "name" in tool:
fn["name"] = tool["name"]
if tool.get("description") is not None:
fn["description"] = tool["description"]
if tool.get("parameters") is not None:
fn["parameters"] = tool["parameters"]
if tool.get("strict") is not None:
fn["strict"] = tool["strict"]
out.append({"type": "function", "function": fn})
return out or None
def _translate_responses_tool_choice_to_chat(tool_choice: Any) -> Any:
"""Translate a Responses-shape ``tool_choice`` to the Chat Completions shape.
String values (``"auto"``/``"none"``/``"required"``) pass through unchanged.
The Responses forcing object ``{"type": "function", "name": "X"}`` is
converted to Chat Completions' ``{"type": "function", "function": {"name": "X"}}``.
Unknown / built-in tool choices are forwarded as-is; llama-server ignores
what it doesn't recognise.
"""
if tool_choice is None:
return None
if isinstance(tool_choice, str):
return tool_choice
if (
isinstance(tool_choice, dict)
and tool_choice.get("type") == "function"
and "name" in tool_choice
and "function" not in tool_choice
):
return {"type": "function", "function": {"name": tool_choice["name"]}}
return tool_choice
def _responses_message_text(content: Union[str, list]) -> str:
"""Flatten a ResponsesInputMessage ``content`` into a plain text string.
Used for system/developer message hoisting and for assistant-replay
(``output_text``) messages when images/unknown parts are irrelevant.
Returns an empty string for empty input.
"""
if isinstance(content, str):
return content
parts: list[str] = []
for part in content or []:
if isinstance(part, (ResponsesInputTextPart, ResponsesOutputTextPart)):
parts.append(part.text)
return "\n".join(parts)
def _normalise_responses_input(payload: ResponsesRequest) -> list[ChatMessage]:
"""Convert a ResponsesRequest into a list of ChatMessage for the completions backend."""
"""Convert a ResponsesRequest's ``input`` into Chat-format ``ChatMessage`` list.
Handles the three input item shapes allowed by the Responses API:
- ``ResponsesInputMessage`` regular chat messages (text or multimodal).
- ``ResponsesFunctionCallInputItem`` a prior assistant tool call replayed
on a follow-up turn. Converted into an assistant message carrying a
Chat Completions ``tool_calls`` entry keyed by ``call_id``.
- ``ResponsesFunctionCallOutputInputItem`` a tool result the client is
returning. Converted into a ``role="tool"`` message with ``tool_call_id``
set to the originating ``call_id`` so llama-server can reconcile the
call with its result.
System / developer content is collected from ``instructions`` *and* from
any ``role="system"`` / ``role="developer"`` entries in ``input``, then
merged into a single ``role="system"`` message placed at the top of the
returned list. This satisfies strict chat templates (harmony / gpt-oss,
Qwen3, ...) whose Jinja raises ``"System message must be at the
beginning."`` when more than one system message is present or when a
system message appears after a user turn the exact pattern the OpenAI
Codex CLI hits, since Codex sets ``instructions`` *and* also sends a
developer message in ``input``.
"""
system_parts: list[str] = []
messages: list[ChatMessage] = []
# System / developer instructions
if payload.instructions:
messages.append(ChatMessage(role = "system", content = payload.instructions))
system_parts.append(payload.instructions)
# Simple string input
if isinstance(payload.input, str):
if payload.input:
messages.append(ChatMessage(role = "user", content = payload.input))
if system_parts:
merged = "\n\n".join(p for p in system_parts if p)
return [ChatMessage(role = "system", content = merged), *messages]
return messages
# List of ResponsesInputMessage
for msg in payload.input:
role = "system" if msg.role == "developer" else msg.role
for item in payload.input:
if isinstance(item, ResponsesFunctionCallInputItem):
messages.append(
ChatMessage(
role = "assistant",
content = None,
tool_calls = [
{
"id": item.call_id,
"type": "function",
"function": {
"name": item.name,
"arguments": item.arguments,
},
}
],
)
)
continue
if isinstance(msg.content, str):
messages.append(ChatMessage(role = role, content = msg.content))
else:
# Convert Responses content parts -> Chat content parts
parts = []
for part in msg.content:
if isinstance(part, ResponsesInputTextPart):
parts.append(TextContentPart(type = "text", text = part.text))
elif isinstance(part, ResponsesInputImagePart):
parts.append(
ImageContentPart(
type = "image_url",
image_url = ImageUrl(url = part.image_url, detail = part.detail),
)
if isinstance(item, ResponsesFunctionCallOutputInputItem):
# Chat Completions `role="tool"` requires a string content; if a
# Responses client sends a content-array output, serialize it.
output = item.output
if not isinstance(output, str):
output = json.dumps(output)
messages.append(
ChatMessage(
role = "tool",
tool_call_id = item.call_id,
content = output,
)
)
continue
if isinstance(item, ResponsesUnknownInputItem):
# Reasoning items and any other unmodelled top-level Responses
# item types are silently dropped — llama-server-backed GGUFs
# cannot consume them and our lenient validation let them in so
# unrelated turns don't 422.
continue
# ResponsesInputMessage — hoist system/developer to the top, merge.
if item.role in ("system", "developer"):
hoisted = _responses_message_text(item.content)
if hoisted:
system_parts.append(hoisted)
continue
if isinstance(item.content, str):
messages.append(ChatMessage(role = item.role, content = item.content))
continue
# Assistant-replay turns come back as content = [output_text, ...].
# Chat Completions' assistant role expects a plain string, not a
# multimodal content array, so flatten output_text (and any stray
# input_text / unknown text) to a single string.
if item.role == "assistant":
text = _responses_message_text(item.content)
if text:
messages.append(ChatMessage(role = "assistant", content = text))
continue
# User (and any other remaining roles) — keep multimodal when
# present, drop unknown content parts silently.
parts: list = []
for part in item.content:
if isinstance(part, (ResponsesInputTextPart, ResponsesOutputTextPart)):
parts.append(TextContentPart(type = "text", text = part.text))
elif isinstance(part, ResponsesInputImagePart):
parts.append(
ImageContentPart(
type = "image_url",
image_url = ImageUrl(url = part.image_url, detail = part.detail),
)
messages.append(ChatMessage(role = role, content = parts if parts else ""))
)
# ResponsesUnknownContentPart and anything else: drop.
if parts:
# Collapse single-text-part content to a plain string so roles
# that reject multimodal arrays (e.g. legacy templates) still
# accept the message.
if len(parts) == 1 and isinstance(parts[0], TextContentPart):
messages.append(ChatMessage(role = item.role, content = parts[0].text))
else:
messages.append(ChatMessage(role = item.role, content = parts))
if system_parts:
merged = "\n\n".join(p for p in system_parts if p)
return [ChatMessage(role = "system", content = merged), *messages]
return messages
def _build_chat_request(
payload: ResponsesRequest, messages: list[ChatMessage], stream: bool
) -> ChatCompletionRequest:
"""Build a ChatCompletionRequest from a ResponsesRequest."""
chat_kwargs = dict(
"""Build a ChatCompletionRequest from a ResponsesRequest.
Tools and ``tool_choice`` are translated from the flat Responses shape to
the nested Chat Completions shape here so the existing #5099
``/v1/chat/completions`` client-side pass-through picks them up without
further modification.
"""
chat_kwargs: dict = dict(
model = payload.model,
messages = messages,
stream = stream,
@ -2136,7 +2374,46 @@ def _build_chat_request(
chat_kwargs["top_p"] = payload.top_p
if payload.max_output_tokens is not None:
chat_kwargs["max_tokens"] = payload.max_output_tokens
return ChatCompletionRequest(**chat_kwargs)
chat_tools = _translate_responses_tools_to_chat(payload.tools)
if chat_tools is not None:
chat_kwargs["tools"] = chat_tools
chat_tool_choice = _translate_responses_tool_choice_to_chat(payload.tool_choice)
if chat_tool_choice is not None:
chat_kwargs["tool_choice"] = chat_tool_choice
req = ChatCompletionRequest(**chat_kwargs)
# `parallel_tool_calls` is not a first-class field on ChatCompletionRequest,
# but the model allows extras and _build_openai_passthrough_body forwards
# only explicitly-known fields. Llama-server does not currently implement
# parallel_tool_calls semantics, so we accept-and-ignore it on the
# Responses side to avoid breaking SDK clients that always send it.
return req
def _chat_tool_calls_to_responses_output(tool_calls: list[dict]) -> list[dict]:
"""Map Chat Completions ``tool_calls`` into Responses ``function_call`` output items.
The Chat Completions id (``call_xxx``) is the shared correlation key across
turns in the OpenAI Responses API it is stored as ``call_id`` on the
output item and must be echoed back by the client as
``function_call_output.call_id`` on the next turn.
"""
items: list[dict] = []
for tc in tool_calls:
if tc.get("type") != "function":
continue
fn = tc.get("function") or {}
items.append(
ResponsesOutputFunctionCall(
call_id = tc.get("id", ""),
name = fn.get("name", ""),
arguments = fn.get("arguments", "") or "",
status = "completed",
).model_dump()
)
return items
async def _responses_non_streaming(
@ -2156,35 +2433,44 @@ async def _responses_non_streaming(
else:
body = result
# Extract content and usage from the Chat Completions response
choices = body.get("choices", [])
text = ""
tool_calls: list[dict] = []
if choices:
msg = choices[0].get("message", {})
msg = choices[0].get("message", {}) or {}
text = msg.get("content", "") or ""
tool_calls = msg.get("tool_calls") or []
usage_data = body.get("usage", {})
input_tokens = usage_data.get("prompt_tokens", 0)
output_tokens = usage_data.get("completion_tokens", 0)
resp_id = f"resp_{uuid.uuid4().hex[:12]}"
msg_id = f"msg_{uuid.uuid4().hex[:12]}"
# Responses API emits each tool call as its own top-level output item,
# alongside an optional assistant text message. Emit the text message
# only when the model actually produced content, so clients that expect
# a pure tool-call turn (finish_reason="tool_calls") don't see a spurious
# empty message item.
output_items: list[dict] = []
if text:
msg_id = f"msg_{uuid.uuid4().hex[:12]}"
output_items.append(
ResponsesOutputMessage(
id = msg_id,
status = "completed",
role = "assistant",
content = [ResponsesOutputTextContent(text = text)],
).model_dump()
)
output_items.extend(_chat_tool_calls_to_responses_output(tool_calls))
response = ResponsesResponse(
id = resp_id,
created_at = int(time.time()),
status = "completed",
model = body.get("model", payload.model),
output = [
ResponsesOutputMessage(
id = msg_id,
status = "completed",
role = "assistant",
content = [
ResponsesOutputTextContent(text = text),
],
),
],
output = output_items,
usage = ResponsesUsage(
input_tokens = input_tokens,
output_tokens = output_tokens,
@ -2203,24 +2489,100 @@ async def _responses_stream(
messages: list[ChatMessage],
request: Request,
):
"""Handle a streaming Responses API call, emitting named SSE events."""
"""Handle a streaming Responses API call, emitting named SSE events.
For GGUF models the request goes directly to llama-server's
``/v1/chat/completions`` endpoint from inside the StreamingResponse
child task a single httpx lifecycle, a single async generator.
Wrapping the existing ``openai_chat_completions`` pass-through (which
already does its own httpx lifecycle) stacks two generators: Python
3.13 + httpcore 1.0.x then loses the close-propagation chain on the
innermost ``HTTP11ConnectionByteStream`` at asyncgen finalisation,
tripping "Attempted to exit cancel scope in a different task" /
"async generator ignored GeneratorExit". The direct path avoids that
altogether. Non-GGUF falls back to the wrapper (which doesn't use
httpx, so the issue doesn't apply).
Text deltas arrive as ``response.output_text.delta`` on a single
``message`` output item at ``output_index=0``. Each tool call from
``delta.tool_calls[]`` is promoted to its own top-level ``function_call``
output item (one per distinct ``tool_calls[].index``), and relayed as
``response.function_call_arguments.delta`` / ``.done`` events so clients
(Codex, OpenAI Python SDK) can reconstruct the call incrementally and
reply with a ``function_call_output`` item on the next turn.
"""
resp_id = f"resp_{uuid.uuid4().hex[:12]}"
msg_id = f"msg_{uuid.uuid4().hex[:12]}"
item_id = f"item_{uuid.uuid4().hex[:12]}"
created_at = int(time.time())
chat_req = _build_chat_request(payload, messages, stream = True)
result = await openai_chat_completions(chat_req, request)
llama_backend = get_llama_cpp_backend()
if not llama_backend.is_loaded:
# The direct pass-through is GGUF-only. Non-GGUF /v1/responses
# streaming isn't a Codex-compatible path today and wrapping the
# transformers backend's streaming generator here would re-
# introduce the double-layer asyncgen close pattern that produces
# "Attempted to exit cancel scope in a different task" on Python
# 3.13. Surface a typed 400 so the client sees a useful error
# instead of a dangling stream.
raise HTTPException(
status_code = 400,
detail = (
"Streaming /v1/responses requires a GGUF model loaded via "
"llama-server. Use non-streaming /v1/responses, "
"/v1/chat/completions, or load a GGUF model."
),
)
body = _build_openai_passthrough_body(chat_req)
target_url = f"{llama_backend.base_url}/v1/chat/completions"
async def event_generator():
full_text = ""
input_tokens = 0
output_tokens = 0
# Per-tool-call state keyed by the Chat Completions `tool_calls[].index`
# which stays stable across chunks for the same call. Values are:
# {output_index, item_id, call_id, name, arguments, opened}
tool_call_state: dict[int, dict] = {}
# Text message lives at output_index 0; tool calls claim 1, 2, ...
next_output_index = 1
def _snapshot_output() -> list[dict]:
"""Snapshot of all completed output items for response.completed."""
items: list[dict] = [
{
"type": "message",
"id": msg_id,
"status": "completed",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": full_text,
"annotations": [],
}
],
}
]
for st in sorted(tool_call_state.values(), key = lambda s: s["output_index"]):
items.append(
{
"type": "function_call",
"id": st["item_id"],
"status": "completed",
"call_id": st["call_id"],
"name": st["name"],
"arguments": st["arguments"],
}
)
return items
# ── Preamble events ──
yield f"event: response.created\ndata: {json.dumps({'type': 'response.created', 'response': {'id': resp_id, 'object': 'response', 'created_at': created_at, 'status': 'in_progress', 'model': payload.model, 'output': [], 'usage': {'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0}}})}\n\n"
# output_item.added
# output_item.added (text message at output_index 0)
output_item = {
"type": "message",
"id": msg_id,
@ -2234,62 +2596,215 @@ async def _responses_stream(
content_part = {"type": "output_text", "text": "", "annotations": []}
yield f"event: response.content_part.added\ndata: {json.dumps({'type': 'response.content_part.added', 'item_id': msg_id, 'output_index': 0, 'content_index': 0, 'part': content_part})}\n\n"
# ── Stream delta events from the inner chat completions stream ──
if isinstance(result, StreamingResponse):
async for raw_chunk in result.body_iterator:
if isinstance(raw_chunk, bytes):
raw_chunk = raw_chunk.decode("utf-8", errors = "replace")
# ── Direct httpx lifecycle to llama-server ──
# Full same-task open + close, identical pattern to
# _openai_passthrough_stream and _anthropic_passthrough_stream:
# no `async with`, explicit aclose of lines_iter BEFORE resp /
# client so the innermost httpcore byte stream is finalised in
# this task (not via Python's asyncgen GC in a sibling task).
client = httpx.AsyncClient(timeout = 600)
resp = None
lines_iter = None
try:
req = client.build_request("POST", target_url, json = body)
try:
resp = await client.send(req, stream = True)
except httpx.RequestError as e:
logger.error("responses stream: upstream unreachable: %s", e)
yield f"event: response.failed\ndata: {json.dumps({'type': 'response.failed', 'response': {'id': resp_id, 'object': 'response', 'created_at': created_at, 'status': 'failed', 'model': payload.model, 'output': [], 'error': {'code': 502, 'message': _friendly_error(e)}}})}\n\n"
return
for line in raw_chunk.split("\n"):
line = line.strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str == "[DONE]":
continue
try:
chunk_data = json.loads(data_str)
except json.JSONDecodeError:
continue
if resp.status_code != 200:
err_bytes = await resp.aread()
err_text = err_bytes.decode("utf-8", errors = "replace")
logger.error(
"responses stream upstream error: status=%s body=%s",
resp.status_code,
err_text[:500],
)
yield f"event: response.failed\ndata: {json.dumps({'type': 'response.failed', 'response': {'id': resp_id, 'object': 'response', 'created_at': created_at, 'status': 'failed', 'model': payload.model, 'output': [], 'error': {'code': resp.status_code, 'message': f'llama-server error: {err_text[:500]}'}}})}\n\n"
return
choices = chunk_data.get("choices", [])
if not choices:
# Check for usage in final chunk
usage = chunk_data.get("usage")
if usage:
input_tokens = usage.get("prompt_tokens", input_tokens)
output_tokens = usage.get(
"completion_tokens", output_tokens
)
continue
lines_iter = resp.aiter_lines()
async for raw_line in lines_iter:
if await request.is_disconnected():
break
if not raw_line:
continue
if not raw_line.startswith("data: "):
continue
data_str = raw_line[6:]
if data_str.strip() == "[DONE]":
break
try:
chunk_data = json.loads(data_str)
except json.JSONDecodeError:
continue
delta = choices[0].get("delta", {})
content = delta.get("content")
if content:
full_text += content
delta_event = {
"type": "response.output_text.delta",
"item_id": msg_id,
"output_index": 0,
"content_index": 0,
"delta": content,
}
yield f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n"
# Check for usage in chunk
choices = chunk_data.get("choices", [])
if not choices:
usage = chunk_data.get("usage")
if usage:
input_tokens = usage.get("prompt_tokens", input_tokens)
output_tokens = usage.get("completion_tokens", output_tokens)
continue
# ── Closing events ──
# output_text.done
delta = choices[0].get("delta", {}) or {}
content = delta.get("content")
if content:
full_text += content
delta_event = {
"type": "response.output_text.delta",
"item_id": msg_id,
"output_index": 0,
"content_index": 0,
"delta": content,
}
yield f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n"
for tc in delta.get("tool_calls") or []:
idx = tc.get("index", 0)
st = tool_call_state.get(idx)
fn = tc.get("function") or {}
if st is None:
# First chunk for this tool call — allocate an
# output_index and emit output_item.added.
st = {
"output_index": next_output_index,
"item_id": f"fc_{uuid.uuid4().hex[:12]}",
"call_id": tc.get("id") or "",
"name": fn.get("name") or "",
"arguments": "",
"opened": False,
}
next_output_index += 1
tool_call_state[idx] = st
else:
# Later chunks sometimes carry the id/name only
# once; merge when present.
if tc.get("id") and not st["call_id"]:
st["call_id"] = tc["id"]
if fn.get("name") and not st["name"]:
st["name"] = fn["name"]
if not st["opened"] and st["call_id"] and st["name"]:
item_added = {
"type": "response.output_item.added",
"output_index": st["output_index"],
"item": {
"type": "function_call",
"id": st["item_id"],
"status": "in_progress",
"call_id": st["call_id"],
"name": st["name"],
"arguments": "",
},
}
yield f"event: response.output_item.added\ndata: {json.dumps(item_added)}\n\n"
st["opened"] = True
arg_delta = fn.get("arguments") or ""
if arg_delta and st["opened"]:
st["arguments"] += arg_delta
args_delta_event = {
"type": "response.function_call_arguments.delta",
"item_id": st["item_id"],
"output_index": st["output_index"],
"delta": arg_delta,
}
yield f"event: response.function_call_arguments.delta\ndata: {json.dumps(args_delta_event)}\n\n"
elif arg_delta:
# Buffer the args until we can open the item
# (id/name arrive in the same chunk as the first
# arg delta for some models — but if not, stash).
st["arguments"] += arg_delta
usage = chunk_data.get("usage")
if usage:
input_tokens = usage.get("prompt_tokens", input_tokens)
output_tokens = usage.get("completion_tokens", output_tokens)
except Exception as e:
logger.error("responses stream error: %s", e)
finally:
if lines_iter is not None:
try:
await lines_iter.aclose()
except Exception:
pass
if resp is not None:
try:
await resp.aclose()
except Exception:
pass
try:
await client.aclose()
except Exception:
pass
# ── Closing events for tool calls ──
for st in sorted(tool_call_state.values(), key = lambda s: s["output_index"]):
# If id/name never arrived (malformed upstream), synthesise so
# the client still sees a coherent frame sequence.
if not st["opened"]:
if not st["call_id"]:
st["call_id"] = f"call_{uuid.uuid4().hex[:12]}"
item_added = {
"type": "response.output_item.added",
"output_index": st["output_index"],
"item": {
"type": "function_call",
"id": st["item_id"],
"status": "in_progress",
"call_id": st["call_id"],
"name": st["name"],
"arguments": "",
},
}
yield f"event: response.output_item.added\ndata: {json.dumps(item_added)}\n\n"
if st["arguments"]:
yield (
"event: response.function_call_arguments.delta\n"
"data: "
+ json.dumps(
{
"type": "response.function_call_arguments.delta",
"item_id": st["item_id"],
"output_index": st["output_index"],
"delta": st["arguments"],
}
)
+ "\n\n"
)
st["opened"] = True
args_done = {
"type": "response.function_call_arguments.done",
"item_id": st["item_id"],
"output_index": st["output_index"],
"name": st["name"],
"arguments": st["arguments"],
}
yield f"event: response.function_call_arguments.done\ndata: {json.dumps(args_done)}\n\n"
item_done = {
"type": "response.output_item.done",
"output_index": st["output_index"],
"item": {
"type": "function_call",
"id": st["item_id"],
"status": "completed",
"call_id": st["call_id"],
"name": st["name"],
"arguments": st["arguments"],
},
}
yield f"event: response.output_item.done\ndata: {json.dumps(item_done)}\n\n"
# ── Closing events for text message ──
yield f"event: response.output_text.done\ndata: {json.dumps({'type': 'response.output_text.done', 'item_id': msg_id, 'output_index': 0, 'content_index': 0, 'text': full_text})}\n\n"
# content_part.done
yield f"event: response.content_part.done\ndata: {json.dumps({'type': 'response.content_part.done', 'item_id': msg_id, 'output_index': 0, 'content_index': 0, 'part': {'type': 'output_text', 'text': full_text, 'annotations': []}})}\n\n"
# output_item.done
yield f"event: response.output_item.done\ndata: {json.dumps({'type': 'response.output_item.done', 'output_index': 0, 'item': {'type': 'message', 'id': msg_id, 'status': 'completed', 'role': 'assistant', 'content': [{'type': 'output_text', 'text': full_text, 'annotations': []}]}})}\n\n"
# response.completed
@ -2302,21 +2817,7 @@ async def _responses_stream(
"created_at": created_at,
"status": "completed",
"model": payload.model,
"output": [
{
"type": "message",
"id": msg_id,
"status": "completed",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": full_text,
"annotations": [],
}
],
}
],
"output": _snapshot_output(),
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,

View file

@ -0,0 +1,667 @@
# SPDX-License-Identifier: AGPL-3.0-only
# Copyright 2026-present the Unsloth AI Inc. team. All rights reserved.
"""
Tests for the OpenAI /v1/responses client-side function-calling pass-through.
Covers:
- ResponsesRequest accepts Responses-shape `tools`, `tool_choice`,
`parallel_tool_calls`, and the `function_call` / `function_call_output`
input items used for multi-turn tool loops.
- _translate_responses_tools_to_chat() converts the flat Responses tool
shape to the nested Chat Completions shape, drops non-function built-in
tools, and returns None for empty lists.
- _translate_responses_tool_choice_to_chat() passes string choices through
and converts {type:function,name:X} to Chat Completions' nested shape.
- _normalise_responses_input() maps function_call_output items to
role="tool" ChatMessages with tool_call_id, and function_call items to
assistant messages with tool_calls.
- _chat_tool_calls_to_responses_output() preserves call_id and drops
non-function tool calls.
- ResponsesOutputFunctionCall and ResponsesResponse round-trip tool-call
outputs without losing fields.
No running server or GPU required.
"""
import os
import sys
_backend = os.path.join(os.path.dirname(__file__), "..")
sys.path.insert(0, _backend)
import json
import pytest
from pydantic import ValidationError
from models.inference import (
ChatMessage,
ResponsesFunctionCallInputItem,
ResponsesFunctionCallOutputInputItem,
ResponsesFunctionTool,
ResponsesInputMessage,
ResponsesOutputFunctionCall,
ResponsesOutputMessage,
ResponsesOutputTextContent,
ResponsesOutputTextPart,
ResponsesRequest,
ResponsesResponse,
ResponsesUnknownContentPart,
ResponsesUnknownInputItem,
ResponsesUsage,
)
from routes.inference import (
_chat_tool_calls_to_responses_output,
_normalise_responses_input,
_translate_responses_tool_choice_to_chat,
_translate_responses_tools_to_chat,
)
# =====================================================================
# Request model — tools / tool_choice / parallel_tool_calls
# =====================================================================
class TestResponsesRequestTools:
def test_flat_function_tool_accepted(self):
req = ResponsesRequest(
input = "hi",
tools = [
{
"type": "function",
"name": "get_weather",
"description": "Get the weather for a city.",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
"strict": True,
}
],
)
assert req.tools is not None
assert req.tools[0]["name"] == "get_weather"
assert req.tools[0]["type"] == "function"
assert req.tools[0]["strict"] is True
def test_tool_choice_string_values(self):
for choice in ("auto", "required", "none"):
req = ResponsesRequest(input = "hi", tool_choice = choice)
assert req.tool_choice == choice
def test_tool_choice_forcing_object(self):
req = ResponsesRequest(
input = "hi",
tool_choice = {"type": "function", "name": "get_weather"},
)
assert req.tool_choice == {"type": "function", "name": "get_weather"}
def test_parallel_tool_calls(self):
req = ResponsesRequest(input = "hi", parallel_tool_calls = True)
assert req.parallel_tool_calls is True
def test_builtin_tool_type_passes_validation(self):
"""Non-function built-in tools (web_search, file_search, mcp, ...) must
not raise at request validation so SDKs that default to them don't
fail on Studio; they are filtered out during translation."""
req = ResponsesRequest(
input = "hi",
tools = [{"type": "web_search_preview"}],
)
assert req.tools == [{"type": "web_search_preview"}]
def test_function_tool_model_direct(self):
tool = ResponsesFunctionTool(
type = "function",
name = "send_email",
parameters = {"type": "object", "properties": {}},
)
assert tool.name == "send_email"
assert tool.description is None
def test_function_tool_rejects_other_type(self):
with pytest.raises(ValidationError):
ResponsesFunctionTool(type = "web_search", name = "x")
# =====================================================================
# Request model — function_call / function_call_output input items
# =====================================================================
class TestResponsesMultiTurnInput:
def test_function_call_input_item(self):
req = ResponsesRequest(
input = [
{"role": "user", "content": "Weather in Paris?"},
{
"type": "function_call",
"id": "fc_abc",
"call_id": "call_abc",
"name": "get_weather",
"arguments": '{"city": "Paris"}',
},
{
"type": "function_call_output",
"call_id": "call_abc",
"output": '{"temp": 12}',
},
],
)
assert len(req.input) == 3
assert isinstance(req.input[1], ResponsesFunctionCallInputItem)
assert req.input[1].call_id == "call_abc"
assert isinstance(req.input[2], ResponsesFunctionCallOutputInputItem)
assert req.input[2].call_id == "call_abc"
assert req.input[2].output == '{"temp": 12}'
def test_function_call_output_missing_call_id_rejected(self):
with pytest.raises(ValidationError):
ResponsesFunctionCallOutputInputItem(
type = "function_call_output", output = "x"
)
def test_function_call_output_accepts_content_array(self):
item = ResponsesFunctionCallOutputInputItem(
type = "function_call_output",
call_id = "call_1",
output = [{"type": "output_text", "text": "done"}],
)
assert isinstance(item.output, list)
# =====================================================================
# Translators — tools, tool_choice
# =====================================================================
class TestToolsTranslation:
def test_flat_to_nested(self):
tools = [
{
"type": "function",
"name": "get_weather",
"description": "Returns weather.",
"parameters": {"type": "object"},
"strict": True,
}
]
out = _translate_responses_tools_to_chat(tools)
assert out == [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Returns weather.",
"parameters": {"type": "object"},
"strict": True,
},
}
]
def test_builtin_tools_dropped(self):
out = _translate_responses_tools_to_chat(
[
{"type": "web_search_preview"},
{"type": "file_search"},
{
"type": "function",
"name": "search",
"parameters": {"type": "object"},
},
]
)
assert len(out) == 1
assert out[0]["function"]["name"] == "search"
def test_empty_returns_none(self):
assert _translate_responses_tools_to_chat(None) is None
assert _translate_responses_tools_to_chat([]) is None
def test_only_builtin_tools_returns_none(self):
assert (
_translate_responses_tools_to_chat([{"type": "web_search_preview"}]) is None
)
def test_description_optional(self):
out = _translate_responses_tools_to_chat(
[
{
"type": "function",
"name": "noop",
"parameters": {"type": "object"},
}
]
)
assert "description" not in out[0]["function"]
class TestToolChoiceTranslation:
def test_string_passthrough(self):
for v in ("auto", "required", "none"):
assert _translate_responses_tool_choice_to_chat(v) == v
def test_none_passthrough(self):
assert _translate_responses_tool_choice_to_chat(None) is None
def test_forcing_object_converted(self):
assert _translate_responses_tool_choice_to_chat(
{"type": "function", "name": "get_weather"}
) == {"type": "function", "function": {"name": "get_weather"}}
def test_already_chat_nested_shape_passes_through(self):
"""If a client happens to send the Chat Completions nested shape,
we don't double-wrap it."""
already_nested = {"type": "function", "function": {"name": "get_weather"}}
assert (
_translate_responses_tool_choice_to_chat(already_nested) == already_nested
)
def test_unknown_shape_passes_through(self):
obj = {"type": "allowed_tools", "tools": [{"type": "function", "name": "x"}]}
assert _translate_responses_tool_choice_to_chat(obj) == obj
# =====================================================================
# _normalise_responses_input — multi-turn tool mapping
# =====================================================================
class TestNormaliseResponsesInputWithTools:
def test_function_call_output_maps_to_tool_role(self):
payload = ResponsesRequest(
input = [
{"role": "user", "content": "Weather?"},
{
"type": "function_call",
"call_id": "call_1",
"name": "get_weather",
"arguments": "{}",
},
{
"type": "function_call_output",
"call_id": "call_1",
"output": '{"temp": 20}',
},
],
)
msgs = _normalise_responses_input(payload)
assert len(msgs) == 3
assert msgs[0].role == "user"
assert msgs[1].role == "assistant"
assert msgs[1].tool_calls is not None
assert msgs[1].tool_calls[0]["id"] == "call_1"
assert msgs[1].tool_calls[0]["function"]["name"] == "get_weather"
assert msgs[2].role == "tool"
assert msgs[2].tool_call_id == "call_1"
assert msgs[2].content == '{"temp": 20}'
def test_instructions_plus_developer_message_are_merged(self):
"""Codex CLI sends `instructions` (system prompt) AND a developer
message in `input`. Strict chat templates (harmony / gpt-oss, Qwen3,
...) raise "System message must be at the beginning" when two
separate system-role messages appear, so we must emit exactly one
merged system message at the top.
"""
payload = ResponsesRequest(
instructions = "Base instructions.",
input = [
{"role": "developer", "content": "Developer override."},
{"role": "user", "content": "Hi"},
],
)
msgs = _normalise_responses_input(payload)
system_roles = [m for m in msgs if m.role == "system"]
assert len(system_roles) == 1
assert "Base instructions." in system_roles[0].content
assert "Developer override." in system_roles[0].content
# System must be the very first message for strict templates.
assert msgs[0].role == "system"
assert msgs[1].role == "user"
def test_developer_message_after_user_is_still_hoisted(self):
"""Multi-turn conversations where a developer message appears after
user turns must still produce a single leading system message, not
a mid-conversation system that strict templates reject."""
payload = ResponsesRequest(
input = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi!"},
{"role": "developer", "content": "Updated rules."},
{"role": "user", "content": "Continue"},
],
)
msgs = _normalise_responses_input(payload)
assert msgs[0].role == "system"
assert "Updated rules." in msgs[0].content
for m in msgs[1:]:
assert m.role != "system", "no trailing system message permitted"
def test_no_system_output_when_no_system_input(self):
payload = ResponsesRequest(input = "Hi")
msgs = _normalise_responses_input(payload)
assert all(m.role != "system" for m in msgs)
def test_multiple_system_messages_in_input_are_merged(self):
payload = ResponsesRequest(
input = [
{"role": "system", "content": "A"},
{"role": "system", "content": "B"},
{"role": "user", "content": "Hi"},
],
)
msgs = _normalise_responses_input(payload)
assert sum(1 for m in msgs if m.role == "system") == 1
assert "A" in msgs[0].content and "B" in msgs[0].content
def test_content_array_output_serialised_to_json_string(self):
payload = ResponsesRequest(
input = [
{
"type": "function_call_output",
"call_id": "call_1",
"output": [{"type": "output_text", "text": "ok"}],
}
],
)
msgs = _normalise_responses_input(payload)
assert msgs[0].role == "tool"
# Content is serialised so llama-server sees a string.
assert json.loads(msgs[0].content) == [{"type": "output_text", "text": "ok"}]
# =====================================================================
# Response mapping — tool_calls → function_call output items
# =====================================================================
class TestChatToolCallsToResponsesOutput:
def test_basic_mapping(self):
items = _chat_tool_calls_to_responses_output(
[
{
"id": "call_abc",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"city":"Paris"}',
},
}
]
)
assert len(items) == 1
assert items[0]["type"] == "function_call"
assert items[0]["call_id"] == "call_abc"
assert items[0]["name"] == "get_weather"
assert items[0]["arguments"] == '{"city":"Paris"}'
assert items[0]["status"] == "completed"
assert items[0]["id"].startswith("fc_")
def test_multiple_tool_calls_preserved(self):
items = _chat_tool_calls_to_responses_output(
[
{
"id": "call_1",
"type": "function",
"function": {"name": "a", "arguments": "{}"},
},
{
"id": "call_2",
"type": "function",
"function": {"name": "b", "arguments": "{}"},
},
]
)
assert [it["call_id"] for it in items] == ["call_1", "call_2"]
def test_non_function_tool_call_dropped(self):
items = _chat_tool_calls_to_responses_output([{"id": "x", "type": "retrieval"}])
assert items == []
def test_missing_arguments_coerced_to_empty_string(self):
items = _chat_tool_calls_to_responses_output(
[{"id": "call_1", "type": "function", "function": {"name": "x"}}]
)
assert items[0]["arguments"] == ""
# =====================================================================
# Response model — ResponsesOutputFunctionCall / mixed output
# =====================================================================
class TestResponsesOutputFunctionCall:
def test_direct_construction(self):
fc = ResponsesOutputFunctionCall(
call_id = "call_1",
name = "get_weather",
arguments = '{"city":"Paris"}',
)
d = fc.model_dump()
assert d["type"] == "function_call"
assert d["call_id"] == "call_1"
assert d["status"] == "completed"
assert d["id"].startswith("fc_")
def test_response_with_tool_call_output(self):
resp = ResponsesResponse(
model = "test",
output = [
ResponsesOutputFunctionCall(
call_id = "call_1",
name = "get_weather",
arguments = "{}",
)
],
usage = ResponsesUsage(input_tokens = 1, output_tokens = 1, total_tokens = 2),
)
d = json.loads(resp.model_dump_json())
assert d["output"][0]["type"] == "function_call"
assert d["output"][0]["call_id"] == "call_1"
def test_response_with_mixed_output(self):
resp = ResponsesResponse(
model = "test",
output = [
ResponsesOutputMessage(
content = [ResponsesOutputTextContent(text = "Calling...")],
),
ResponsesOutputFunctionCall(
call_id = "call_1",
name = "get_weather",
arguments = '{"city":"Paris"}',
),
],
)
d = resp.model_dump()
assert d["output"][0]["type"] == "message"
assert d["output"][1]["type"] == "function_call"
# =====================================================================
# Regression: ChatMessage validator still accepts mapped tool messages
# =====================================================================
class TestCodexStyleRequestShapes:
"""Regression tests for the request shapes OpenAI Codex CLI sends."""
def test_assistant_replay_output_text_accepted(self):
"""Codex replays prior assistant turns with `output_text` content.
Before, this triggered a 422 on every turn after the first."""
req = ResponsesRequest(
input = [
{"role": "user", "content": "Hi"},
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Hello!",
"annotations": [],
"logprobs": [],
}
],
},
{"role": "user", "content": "Continue"},
],
)
assert len(req.input) == 3
parts = req.input[1].content
assert isinstance(parts, list)
assert isinstance(parts[0], ResponsesOutputTextPart)
assert parts[0].text == "Hello!"
def test_reasoning_item_accepted_as_unknown(self):
"""`reasoning` items replayed from prior o-series turns must not
fail validation Codex preserves them in multi-turn."""
req = ResponsesRequest(
input = [
{"role": "user", "content": "Hi"},
{
"type": "reasoning",
"id": "rs_1",
"summary": [],
"encrypted_content": "opaque",
},
{"role": "assistant", "content": "Hello!"},
],
)
assert len(req.input) == 3
assert isinstance(req.input[1], ResponsesUnknownInputItem)
def test_unknown_content_part_type_accepted(self):
"""Unknown content-part types (e.g. future input_audio) validate as
ResponsesUnknownContentPart so the whole request doesn't 422."""
req = ResponsesRequest(
input = [
{
"role": "user",
"content": [
{"type": "input_text", "text": "See:"},
{"type": "input_audio", "audio": {"data": "..."}},
],
}
],
)
parts = req.input[0].content
assert isinstance(parts[1], ResponsesUnknownContentPart)
assert parts[1].type == "input_audio"
def test_codex_full_shape_roundtrip(self):
"""End-to-end: developer + user + assistant(output_text) +
function_call + function_call_output + reasoning in one request."""
payload = ResponsesRequest(
instructions = "Base instructions.",
input = [
{
"type": "message",
"role": "developer",
"content": [{"type": "input_text", "text": "Dev override."}],
},
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Weather?"}],
},
{
"type": "reasoning",
"id": "rs_1",
"summary": [],
},
{
"type": "function_call",
"call_id": "call_1",
"name": "get_weather",
"arguments": "{}",
},
{
"type": "function_call_output",
"call_id": "call_1",
"output": '{"temp":20}',
},
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "It's 20°C.",
"annotations": [],
"logprobs": [],
}
],
},
{"role": "user", "content": "And tomorrow?"},
],
)
msgs = _normalise_responses_input(payload)
# Single leading merged system; no mid-conversation system.
assert msgs[0].role == "system"
assert sum(1 for m in msgs if m.role == "system") == 1
assert "Base instructions." in msgs[0].content
assert "Dev override." in msgs[0].content
roles = [m.role for m in msgs[1:]]
# Reasoning item is dropped. Order: user, assistant(tool_calls),
# tool, assistant(text), user.
assert roles == ["user", "assistant", "tool", "assistant", "user"]
assert msgs[2].tool_calls is not None
assert msgs[3].role == "tool"
assert msgs[3].tool_call_id == "call_1"
assert msgs[4].content == "It's 20°C."
def test_single_output_text_part_flattens_to_string(self):
"""ChatMessage assistant role prefers plain string content — tests
confirm we don't forward a single-part array that would otherwise
force legacy chat templates into multimodal handling."""
payload = ResponsesRequest(
input = [
{
"role": "assistant",
"content": [
{"type": "output_text", "text": "ok", "annotations": []}
],
},
{"role": "user", "content": "next"},
],
)
msgs = _normalise_responses_input(payload)
assert msgs[0].role == "assistant"
assert msgs[0].content == "ok"
class TestTranslatedMessagesValidate:
"""Verify that the messages produced by _normalise_responses_input
satisfy ChatMessage's role-shape validator so the downstream /v1/chat/
completions pass-through does not reject them."""
def test_round_trip_multi_turn(self):
payload = ResponsesRequest(
input = [
{"role": "user", "content": "Weather in Paris?"},
{
"type": "function_call",
"call_id": "call_1",
"name": "get_weather",
"arguments": '{"city": "Paris"}',
},
{
"type": "function_call_output",
"call_id": "call_1",
"output": '{"temp": 20}',
},
{"role": "user", "content": "Thanks!"},
],
)
msgs = _normalise_responses_input(payload)
for m in msgs:
# Constructing a fresh ChatMessage from the dump round-trips the
# role-shape validator — the key invariant for the passthrough.
ChatMessage(**m.model_dump(exclude_none = True))