diff --git a/packages/providers/src/community/pi/capabilities.ts b/packages/providers/src/community/pi/capabilities.ts index c729dd79..335256d0 100644 --- a/packages/providers/src/community/pi/capabilities.ts +++ b/packages/providers/src/community/pi/capabilities.ts @@ -8,6 +8,13 @@ import type { ProviderCapabilities } from '../../types'; * envInjection covers both auth-key passthrough (setRuntimeApiKey for mapped * provider env vars) and bash tool subprocess env (BashSpawnHook merges the * caller's env over Pi's inherited baseline), matching Claude/Codex semantics. + * + * structuredOutput is best-effort (not SDK-enforced like Claude/Codex): the + * provider appends a "JSON only" instruction + the schema to the prompt and + * the event bridge parses the final assistant transcript on agent_end. + * Reliable on instruction-following models (GPT-5, Claude, Gemini 2.x, + * recent Qwen Coder, DeepSeek V3); parse failures degrade via the + * dag-executor's existing dag.structured_output_missing path. */ export const PI_CAPABILITIES: ProviderCapabilities = { sessionResume: true, @@ -16,7 +23,7 @@ export const PI_CAPABILITIES: ProviderCapabilities = { skills: true, agents: false, toolRestrictions: true, - structuredOutput: false, + structuredOutput: true, envInjection: true, costControl: false, effortControl: true, diff --git a/packages/providers/src/community/pi/event-bridge.test.ts b/packages/providers/src/community/pi/event-bridge.test.ts index bbc716bd..cc176f79 100644 --- a/packages/providers/src/community/pi/event-bridge.test.ts +++ b/packages/providers/src/community/pi/event-bridge.test.ts @@ -5,6 +5,7 @@ import { buildResultChunk, mapPiEvent, serializeToolResult, + tryParseStructuredOutput, usageToTokens, } from './event-bridge'; @@ -367,3 +368,56 @@ describe('mapPiEvent', () => { ).toEqual([]); }); }); + +// ─── tryParseStructuredOutput ────────────────────────────────────────────── + +describe('tryParseStructuredOutput', () => { + test('parses clean JSON object', () => { + expect(tryParseStructuredOutput('{"name":"alpha","count":3}')).toEqual({ + name: 'alpha', + count: 3, + }); + }); + + test('parses JSON with surrounding whitespace', () => { + expect(tryParseStructuredOutput(' \n{"ok":true}\n ')).toEqual({ ok: true }); + }); + + test('strips ```json fences', () => { + const fenced = '```json\n{"area":"web","confidence":0.9}\n```'; + expect(tryParseStructuredOutput(fenced)).toEqual({ area: 'web', confidence: 0.9 }); + }); + + test('strips bare ``` fences', () => { + expect(tryParseStructuredOutput('```\n{"ok":1}\n```')).toEqual({ ok: 1 }); + }); + + test('parses JSON arrays', () => { + expect(tryParseStructuredOutput('[1,2,3]')).toEqual([1, 2, 3]); + }); + + test('returns undefined on empty string', () => { + expect(tryParseStructuredOutput('')).toBeUndefined(); + expect(tryParseStructuredOutput(' ')).toBeUndefined(); + }); + + test('returns undefined when model wraps JSON in prose', () => { + // Realistic failure mode — model ignores "JSON only" instruction and adds + // explanatory text before/after. Caller degrades via the executor's + // missing-structured-output warning path. + const prose = + 'Here is the JSON you requested:\n{"ok":true}\nLet me know if you need anything else.'; + expect(tryParseStructuredOutput(prose)).toBeUndefined(); + }); + + test('returns undefined on malformed JSON', () => { + expect(tryParseStructuredOutput('{not valid}')).toBeUndefined(); + expect(tryParseStructuredOutput('{"unclosed":')).toBeUndefined(); + }); + + test('preserves backticks inside JSON string values', () => { + // Fence stripper matches only at start/end; inner backticks must survive. + const withBackticks = '{"code":"run `npm test`"}'; + expect(tryParseStructuredOutput(withBackticks)).toEqual({ code: 'run `npm test`' }); + }); +}); diff --git a/packages/providers/src/community/pi/event-bridge.ts b/packages/providers/src/community/pi/event-bridge.ts index 1e975fbf..21d7301f 100644 --- a/packages/providers/src/community/pi/event-bridge.ts +++ b/packages/providers/src/community/pi/event-bridge.ts @@ -151,6 +151,33 @@ export function buildResultChunk(messages: readonly unknown[]): MessageChunk { return chunk; } +/** + * Attempt to parse a Pi assistant transcript as the structured-output JSON + * requested via `outputFormat`. Handles two common model failure modes: + * - trailing/leading whitespace (always stripped) + * - markdown code fences (```json ... ``` or bare ``` ... ```) that models + * emit despite the "no code fences" instruction in the prompt + * + * Returns the parsed value on success, `undefined` on any failure. Callers + * treat `undefined` as "structured output unavailable" and degrade via the + * dag-executor's existing missing-structured-output warning. + */ +export function tryParseStructuredOutput(text: string): unknown { + const trimmed = text.trim(); + if (trimmed.length === 0) return undefined; + // Strip ```json / ``` fences if present. Match only at boundaries so we + // don't mangle JSON strings that legitimately contain backticks. + const cleaned = trimmed + .replace(/^```(?:json)?\s*\n?/i, '') + .replace(/\n?\s*```\s*$/, '') + .trim(); + try { + return JSON.parse(cleaned); + } catch { + return undefined; + } +} + /** * Pure mapper from Pi's `AgentSessionEvent` → zero-or-more Archon `MessageChunk`s. * @@ -243,13 +270,22 @@ export type BridgeQueueItem = export async function* bridgeSession( session: AgentSession, prompt: string, - abortSignal?: AbortSignal + abortSignal?: AbortSignal, + jsonSchema?: Record ): AsyncGenerator { const queue = new AsyncQueue(); + // Best-effort structured-output buffer. Only accumulates when the caller + // requested a JSON schema; otherwise stays empty and the terminal chunk + // passes through untouched. + const wantsStructured = jsonSchema !== undefined; + let assistantBuffer = ''; const unsubscribe = session.subscribe((event: AgentSessionEvent) => { try { for (const chunk of mapPiEvent(event)) { + if (wantsStructured && chunk.type === 'assistant') { + assistantBuffer += chunk.content; + } queue.push({ kind: 'chunk', chunk }); } } catch (err) { @@ -291,8 +327,28 @@ export async function* bridgeSession( // Pi's session.sessionId is always a UUID (even for in-memory); we emit // it unconditionally and let the caller decide whether resume is // meaningful (capability-gated at the registry level). - if (item.chunk.type === 'result' && session.sessionId) { - yield { ...item.chunk, sessionId: session.sessionId }; + if (item.chunk.type === 'result') { + let terminal: MessageChunk = item.chunk; + if (session.sessionId) { + terminal = { ...terminal, sessionId: session.sessionId }; + } + // Best-effort structured output: parse the accumulated assistant + // transcript as JSON and attach. On parse failure, leave it off — + // the dag-executor's existing dag.structured_output_missing path + // warns and downstream $node.output.field refs degrade to '' instead + // of propagating bogus data. + if (wantsStructured) { + const parsed = tryParseStructuredOutput(assistantBuffer); + if (parsed !== undefined) { + terminal = { ...terminal, structuredOutput: parsed }; + } else { + getLog().warn( + { bufferLength: assistantBuffer.length }, + 'pi.event-bridge.structured_output_parse_failed' + ); + } + } + yield terminal; } else { yield item.chunk; } diff --git a/packages/providers/src/community/pi/provider.test.ts b/packages/providers/src/community/pi/provider.test.ts index f5e7f90b..135ee17d 100644 --- a/packages/providers/src/community/pi/provider.test.ts +++ b/packages/providers/src/community/pi/provider.test.ts @@ -902,10 +902,11 @@ describe('PiProvider', () => { expect(caps.skills).toBe(true); expect(caps.sessionResume).toBe(true); expect(caps.envInjection).toBe(true); + // Best-effort structured output via prompt engineering (not SDK-enforced). + expect(caps.structuredOutput).toBe(true); // Still false: expect(caps.mcp).toBe(false); expect(caps.hooks).toBe(false); - expect(caps.structuredOutput).toBe(false); }); test('nodeConfig.skills with unknown name yields system warning, does not abort', async () => { @@ -1047,4 +1048,140 @@ describe('PiProvider', () => { ); expect(systemChunks.some(c => c.content.includes('sonnet-5 not available'))).toBe(true); }); + + // ─── structured output (best-effort JSON via prompt engineering) ────── + + // Script an assistant text_delta followed by agent_end so the bridge has + // buffered content to parse when outputFormat is set. + function scriptedAssistantThenEnd(text: string): FakeEvent[] { + return [ + { + type: 'message_update', + message: { role: 'assistant' } as never, + assistantMessageEvent: { + type: 'text_delta', + contentIndex: 0, + delta: text, + partial: { role: 'assistant' } as never, + }, + }, + ...scriptedAgentEnd(), + ]; + } + + test('outputFormat: schema is appended to prompt as JSON instruction', async () => { + process.env.GEMINI_API_KEY = 'sk-test'; + resetScript(scriptedAgentEnd()); + + await consume( + new PiProvider().sendQuery('Summarize this bug.', '/tmp', undefined, { + model: 'google/gemini-2.5-pro', + outputFormat: { + type: 'json_schema', + schema: { type: 'object', properties: { area: { type: 'string' } } }, + }, + }) + ); + + // Prompt should now contain the original instruction + the schema hint. + expect(mockPrompt).toHaveBeenCalled(); + const [sentPrompt] = mockPrompt.mock.calls[0] as [string]; + expect(sentPrompt).toContain('Summarize this bug.'); + expect(sentPrompt).toContain('Respond with ONLY a JSON object'); + expect(sentPrompt).toContain('"area"'); + }); + + test('outputFormat: absent → prompt passed through unchanged', async () => { + process.env.GEMINI_API_KEY = 'sk-test'; + resetScript(scriptedAgentEnd()); + + await consume( + new PiProvider().sendQuery('do a thing', '/tmp', undefined, { + model: 'google/gemini-2.5-pro', + }) + ); + + const [sentPrompt] = mockPrompt.mock.calls[0] as [string]; + expect(sentPrompt).toBe('do a thing'); + expect(sentPrompt).not.toContain('JSON'); + }); + + test('outputFormat: result chunk carries parsed structuredOutput on clean JSON', async () => { + process.env.GEMINI_API_KEY = 'sk-test'; + resetScript(scriptedAssistantThenEnd('{"area":"web","confidence":0.9}')); + + const { chunks } = await consume( + new PiProvider().sendQuery('classify', '/tmp', undefined, { + model: 'google/gemini-2.5-pro', + outputFormat: { + type: 'json_schema', + schema: { type: 'object' }, + }, + }) + ); + + const result = chunks.find( + (c): c is { type: 'result'; structuredOutput?: unknown } => + typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result' + ); + expect(result).toBeDefined(); + expect(result?.structuredOutput).toEqual({ area: 'web', confidence: 0.9 }); + }); + + test('outputFormat: fenced JSON (```json ... ```) still parses', async () => { + process.env.GEMINI_API_KEY = 'sk-test'; + resetScript(scriptedAssistantThenEnd('```json\n{"ok":true}\n```')); + + const { chunks } = await consume( + new PiProvider().sendQuery('x', '/tmp', undefined, { + model: 'google/gemini-2.5-pro', + outputFormat: { type: 'json_schema', schema: {} }, + }) + ); + + const result = chunks.find( + (c): c is { type: 'result'; structuredOutput?: unknown } => + typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result' + ); + expect(result?.structuredOutput).toEqual({ ok: true }); + }); + + test('outputFormat: prose-wrapped JSON → no structuredOutput, degrades cleanly', async () => { + process.env.GEMINI_API_KEY = 'sk-test'; + resetScript(scriptedAssistantThenEnd('Here is the JSON:\n{"ok":true}\nHope this helps!')); + + const { chunks, error } = await consume( + new PiProvider().sendQuery('x', '/tmp', undefined, { + model: 'google/gemini-2.5-pro', + outputFormat: { type: 'json_schema', schema: {} }, + }) + ); + + // No crash — downstream degradation is the executor's job via its + // existing dag.structured_output_missing warning path. + expect(error).toBeUndefined(); + const result = chunks.find( + (c): c is { type: 'result'; structuredOutput?: unknown } => + typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result' + ); + expect(result).toBeDefined(); + expect(result?.structuredOutput).toBeUndefined(); + }); + + test('no outputFormat → structuredOutput never set even if assistant emits JSON', async () => { + process.env.GEMINI_API_KEY = 'sk-test'; + resetScript(scriptedAssistantThenEnd('{"accidental":"json"}')); + + const { chunks } = await consume( + new PiProvider().sendQuery('x', '/tmp', undefined, { + model: 'google/gemini-2.5-pro', + }) + ); + + const result = chunks.find( + (c): c is { type: 'result'; structuredOutput?: unknown } => + typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result' + ); + expect(result?.structuredOutput).toBeUndefined(); + }); }); diff --git a/packages/providers/src/community/pi/provider.ts b/packages/providers/src/community/pi/provider.ts index ccc2511d..6ced9942 100644 --- a/packages/providers/src/community/pi/provider.ts +++ b/packages/providers/src/community/pi/provider.ts @@ -64,6 +64,30 @@ function lookupPiModel(provider: string, modelId: string): Model | undefine ); } +/** + * Append a "respond with JSON matching this schema" instruction to the user + * prompt so Pi-backed models produce parseable structured output. Pi's SDK + * has no JSON-mode equivalent to Claude's outputFormat or Codex's + * outputSchema, so this is a best-effort fallback: the event bridge parses + * the assistant transcript on agent_end. Models that reliably follow + * instruction (GPT-5, Claude, Gemini 2.x, recent Qwen Coder, DeepSeek V3) + * return clean JSON; models that don't produce a parse failure, which the + * executor surfaces via the existing dag.structured_output_missing warning. + */ +export function augmentPromptForJsonSchema( + prompt: string, + schema: Record +): string { + return `${prompt} + +--- + +CRITICAL: Respond with ONLY a JSON object matching the schema below. No prose before or after the JSON. No markdown code fences. Just the raw JSON object as your final message. + +Schema: +${JSON.stringify(schema, null, 2)}`; +} + /** * Pi community provider — wraps `@mariozechner/pi-coding-agent`'s full * coding-agent harness. Each `sendQuery()` call creates a fresh session @@ -257,10 +281,26 @@ export class PiProvider implements IAgentProvider { yield { type: 'system', content: `⚠️ ${modelFallbackMessage}` }; } - // 5. Bridge callback-based events to the async generator contract. + // 5. Structured output (best-effort). Pi has no SDK-level JSON schema + // mode the way Claude and Codex do, so we implement it via prompt + // engineering: append the schema + "JSON only, no fences" instruction, + // and have the bridge parse the accumulated assistant text on + // agent_end. Parse failures degrade gracefully — the executor's + // existing dag.structured_output_missing warning path handles them. + const outputFormat = requestOptions?.outputFormat; + const effectivePrompt = outputFormat + ? augmentPromptForJsonSchema(prompt, outputFormat.schema) + : prompt; + + // 6. Bridge callback-based events to the async generator contract. // bridgeSession owns dispose() and abort wiring. try { - yield* bridgeSession(session, prompt, requestOptions?.abortSignal); + yield* bridgeSession( + session, + effectivePrompt, + requestOptions?.abortSignal, + outputFormat?.schema + ); getLog().info({ piProvider: parsed.provider }, 'pi.prompt_completed'); } catch (err) { getLog().error({ err, piProvider: parsed.provider }, 'pi.prompt_failed'); diff --git a/packages/providers/src/registry.test.ts b/packages/providers/src/registry.test.ts index 6fee3a65..64b879a9 100644 --- a/packages/providers/src/registry.test.ts +++ b/packages/providers/src/registry.test.ts @@ -304,7 +304,7 @@ describe('registry', () => { expect(piEntries).toHaveLength(1); }); - test('declares v2 capabilities (thinking, effort, tools, skills, sessionResume, envInjection supported)', () => { + test('declares v2 capabilities (thinking, effort, tools, skills, sessionResume, envInjection, structuredOutput supported)', () => { registerPiProvider(); const caps = getProviderCapabilities('pi'); // Flipped true in v2 @@ -314,10 +314,12 @@ describe('registry', () => { expect(caps.skills).toBe(true); expect(caps.sessionResume).toBe(true); expect(caps.envInjection).toBe(true); + // Best-effort structured output via prompt engineering + post-parse — + // not SDK-enforced like Claude/Codex, but wired up and tested. + expect(caps.structuredOutput).toBe(true); // Still false (out of v2 scope) expect(caps.mcp).toBe(false); expect(caps.hooks).toBe(false); - expect(caps.structuredOutput).toBe(false); expect(caps.costControl).toBe(false); expect(caps.fallbackModel).toBe(false); expect(caps.sandbox).toBe(false);