feat(cli): add lh notify command for external agent callbacks (#13664)

*  feat(cli): add `lh notify` command for external agent callbacks

Add a new `lh notify` CLI command and server-side TRPC endpoint that allows
external agents (e.g. Claude Code) to send callback messages to a topic and
trigger the agent loop to process them.

Fixes LOBE-6888

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🔧 chore(cli): replace sessionId with agentId and threadId in notify command

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2026-04-08 18:03:55 +08:00 committed by GitHub
parent b6a47debfd
commit c68dfa00df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 142 additions and 0 deletions

View file

@ -0,0 +1,51 @@
import type { Command } from 'commander';
import pc from 'picocolors';
import { getTrpcClient } from '../api/client';
import { log } from '../utils/logger';
export function registerNotifyCommand(program: Command) {
program
.command('notify')
.description('Send a callback message to a topic and trigger the agent to process it')
.requiredOption('--topic <topicId>', 'Target topic ID')
.requiredOption('-c, --content <content>', 'Message content')
.option('--agent-id <agentId>', 'Agent ID (overrides topic default)')
.option('--thread-id <threadId>', 'Thread ID for threaded conversations')
.option('--json', 'Output JSON')
.action(
async (options: {
agentId?: string;
content: string;
json?: boolean;
threadId?: string;
topic: string;
}) => {
log.debug('notify: topic=%s, agentId=%s', options.topic, options.agentId);
const client = await getTrpcClient();
try {
const result = await client.agentNotify.notify.mutate({
agentId: options.agentId,
content: options.content,
threadId: options.threadId,
topicId: options.topic,
});
if (options.json) {
console.log(JSON.stringify(result, null, 2));
return;
}
console.log(`${pc.green('✓')} Message sent to topic ${pc.bold(result.topicId)}`);
if (result.operationId) {
console.log(` Operation ID: ${result.operationId}`);
}
} catch (error: any) {
console.error(`${pc.red('✗')} Failed to send notification: ${error.message}`);
process.exit(1);
}
},
);
}

View file

@ -22,6 +22,7 @@ import { registerMemoryCommand } from './commands/memory';
import { registerMessageCommand } from './commands/message';
import { registerMigrateCommand } from './commands/migrate';
import { registerModelCommand } from './commands/model';
import { registerNotifyCommand } from './commands/notify';
import { registerPluginCommand } from './commands/plugin';
import { registerProviderCommand } from './commands/provider';
import { registerSearchCommand } from './commands/search';
@ -68,6 +69,7 @@ export function createProgram() {
registerTopicCommand(program);
registerMessageCommand(program);
registerModelCommand(program);
registerNotifyCommand(program);
registerProviderCommand(program);
registerPluginCommand(program);
registerUserCommand(program);

View file

@ -0,0 +1,87 @@
import { TRPCError } from '@trpc/server';
import debug from 'debug';
import { z } from 'zod';
import { TopicModel } from '@/database/models/topic';
import { authedProcedure, router } from '@/libs/trpc/lambda';
import { serverDatabase } from '@/libs/trpc/lambda/middleware';
import { AiAgentService } from '@/server/services/aiAgent';
const log = debug('lobe-server:agent-notify-router');
const agentNotifyProcedure = authedProcedure.use(serverDatabase).use(async (opts) => {
const { ctx } = opts;
return opts.next({
ctx: {
aiAgentService: new AiAgentService(ctx.serverDB, ctx.userId),
topicModel: new TopicModel(ctx.serverDB, ctx.userId),
},
});
});
const NotifySchema = z.object({
/** Agent ID to trigger (overrides the topic's default agent) */
agentId: z.string().optional(),
/** Message content from the external agent */
content: z.string(),
/** Thread ID for threaded conversations */
threadId: z.string().optional(),
/** Topic ID to send the message to */
topicId: z.string(),
});
export const agentNotifyRouter = router({
/**
* Receive a callback message from an external agent (e.g. Claude Code),
* write it into a topic, and trigger the agent loop to process it.
*/
notify: agentNotifyProcedure.input(NotifySchema).mutation(async ({ input, ctx }) => {
const { topicId, content, agentId: inputAgentId, threadId } = input;
log('notify: topicId=%s, agentId=%s, content=%s', topicId, inputAgentId, content.slice(0, 80));
// 1. Verify the topic exists and get its agentId
const topic = await ctx.topicModel.findById(topicId);
if (!topic) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Topic ${topicId} not found`,
});
}
const agentId = inputAgentId ?? topic.agentId;
if (!agentId) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: `Topic ${topicId} has no associated agent and no agentId was provided`,
});
}
// 2. Trigger the agent loop (execAgent handles message creation internally)
try {
const result = await ctx.aiAgentService.execAgent({
agentId,
appContext: { threadId, topicId },
prompt: content,
});
return {
operationId: result.operationId,
topicId,
};
} catch (error: any) {
console.error('agentNotify execAgent failed: %O', error);
if (error instanceof TRPCError) {
throw error;
}
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: `Failed to trigger agent: ${error.message}`,
});
}
}),
});

View file

@ -15,6 +15,7 @@ import { agentDocumentRouter } from './agentDocument';
import { agentEvalRouter } from './agentEval';
import { agentEvalExternalRouter } from './agentEvalExternal';
import { agentGroupRouter } from './agentGroup';
import { agentNotifyRouter } from './agentNotify';
import { agentSkillsRouter } from './agentSkills';
import { aiAgentRouter } from './aiAgent';
import { aiChatRouter } from './aiChat';
@ -63,6 +64,7 @@ import { videoRouter } from './video';
export const lambdaRouter = router({
agent: agentRouter,
agentBotProvider: agentBotProviderRouter,
agentNotify: agentNotifyRouter,
botMessage: botMessageRouter,
agentCronJob: agentCronJobRouter,
agentDocument: agentDocumentRouter,