diff --git a/.changeset/custom-agent-loop-fixes.md b/.changeset/custom-agent-loop-fixes.md new file mode 100644 index 0000000000..4d37fff535 --- /dev/null +++ b/.changeset/custom-agent-loop-fixes.md @@ -0,0 +1,9 @@ +--- +"@trigger.dev/sdk": patch +--- + +Three fixes for custom agent loops (`chat.customAgent`, `chat.createSession`, and hand-rolled `MessageAccumulator` loops): + +- Continuation runs no longer replay already-answered user messages into the first turn. The `.in` resume cursor is now seeded before any listener attaches (the same boot logic `chat.agent` uses), so a chat that continues after a cancel, crash, or upgrade only sees genuinely new messages. +- Steering a hand-rolled loop mid-stream no longer wipes the in-flight assistant response. `chat.pipeAndCapture` now stamps a server-generated message id on the stream, so a `prepareStep` injection keeps the partial text instead of replacing the message. +- Task-backed tools (`ai.toolExecute`) now work from custom agent loops: the parent's session is threaded to the child run, so child tasks can stream progress into the chat with `chat.stream.writer({ target: "root" })` instead of failing with "session handle is not initialized". diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index e3b3e60549..0d0caf7c96 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -160,6 +160,10 @@ const chatTurnContextKey = locals.create("chat.turnContext"); * @internal */ const chatSessionHandleKey = locals.create("chat.sessionHandle"); +// The external `chatId` from the boot payload — the value `ToolCallExecutionOptions.chatId` +// is documented to carry. Custom-agent loops never set per-turn context, so subtask tool +// metadata reads this directly rather than the Session handle id. +const chatExternalIdKey = locals.create("chat.externalId"); /** * S2 seq_num of the most recent `turn-complete` control record written by @@ -221,6 +225,47 @@ export async function __findLatestSessionInCursorForTests( return findLatestSessionInCursor(chatId); } +/** + * Seed the `.in` resume cursor for custom-agent loops (`chat.customAgent` + * raw loops and `chat.createSession`) the way `chat.agent`'s boot does. + * + * MUST run before anything attaches a `.in` listener (`createStopSignal`, + * `chat.messages.on`, the first wait): attaching opens the SSE tail with + * `Last-Event-ID` from the seeded cursor, so attach-then-seed replays + * every record from seq 0 — already-answered user messages get delivered + * into the new run's first wait and the loop re-answers them. + * + * Seeds both cursors: `setLastSeqNum` controls the SSE `Last-Event-ID`, + * `setLastDispatchedSeqNum` gates waiter dispatch — seeding only the + * former still re-delivers records the manager buffered before the seed. + * + * No-ops on fresh boots and when a cursor is already seeded (e.g. the + * `chatCustomAgent` wrapper ran before a nested `createChatSession`). + * @internal + */ +async function seedSessionInResumeCursorForCustomLoop( + payload: Pick +): Promise { + if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return; + // No continuation/attempt gate: the wire may omit `continuation` on a + // run that still has prior turns (chat.agent covers that case via its + // snapshot). The scan doubles as the prior-state probe — a fresh + // session has no turn-complete on `.out`, returns no cursor, and + // seeds nothing. Cost on fresh boots is one non-blocking records read. + try { + const cursor = await findLatestSessionInCursor(payload.chatId); + if (cursor !== undefined) { + sessionStreams.setLastSeqNum(payload.chatId, "in", cursor); + sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor); + } + } catch (error) { + logger.warn( + "chat session: session.in resume cursor lookup failed; old messages may replay", + { error: error instanceof Error ? error.message : String(error) } + ); + } +} + /** * Versioned blob written to S3 after every turn completes (when no * `hydrateMessages` hook is registered). Read at run boot to seed the @@ -921,6 +966,15 @@ function createTaskToolExecuteHandler< toolMeta.turn = chatCtx.turn; toolMeta.continuation = chatCtx.continuation; toolMeta.clientData = chatCtx.clientData; + } else { + // Hand-rolled chat.customAgent loops never set per-turn context, but + // the wrapper records the boot payload's external chatId at run boot + // — thread it so subtask chat helpers (`chat.stream.writer` with + // target "root") can open the parent's session. + const chatExternalId = locals.get(chatExternalIdKey); + if (chatExternalId) { + toolMeta.chatId = chatExternalId; + } } const chatLocals: Record = {}; @@ -5104,6 +5158,7 @@ function chatCustomAgent< // `chat.createStartSessionAction`) before this run is triggered. // No client-side upsert needed. locals.set(chatSessionHandleKey, sessions.open(payload.chatId)); + locals.set(chatExternalIdKey, payload.chatId); locals.set(chatAgentRunContextKey, runOptions.ctx); // Initialize the turn-complete trim slot so `chat.writeTurnComplete` // trims `session.out` back to the previous turn boundary. Without @@ -5113,6 +5168,10 @@ function chatCustomAgent< markChatAgentRunForStreamsWarning(); taskContext.setConversationId(payload.chatId); stampConversationIdOnActiveSpan(payload.chatId); + // Seed the `.in` resume cursor before user code attaches any `.in` + // listener — otherwise a continuation boot replays already-answered + // messages into the loop's first wait. + await seedSessionInResumeCursorForCustomLoop(payload); return userRun(payload, runOptions); }, }); @@ -5213,6 +5272,7 @@ function chatAgent< // `chat.createStartSessionAction` or browser-direct) before this // run is triggered — no client-side upsert needed here. locals.set(chatSessionHandleKey, sessions.open(payload.chatId)); + locals.set(chatExternalIdKey, payload.chatId); // Mutable holder; advances in `writeTurnCompleteChunk` after each turn // and is the trim target for the NEXT turn's trim record. locals.set(lastTurnCompleteSeqNumKey, { value: undefined }); @@ -8613,8 +8673,15 @@ async function pipeChatAndCapture( resolveOnFinish = r; }); + const resolvedOptions = resolveUIMessageStreamOptions(); const uiStream = source.toUIMessageStream({ - ...resolveUIMessageStreamOptions(), + ...resolvedOptions, + // Stamp a server-generated id on the start chunk, same as chat.agent's + // pipe. Without it the AI SDK regenerates the assistant id when a + // prepareStep injection (steering) starts a new step mid-stream, and + // the frontend replaces the partial message — wiping the + // pre-injection text from the UI and the captured response. + generateMessageId: resolvedOptions.generateMessageId ?? generateMessageId, onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => { captured = responseMessage; resolveOnFinish!(); @@ -8936,14 +9003,18 @@ export type ChatTurn = { * signaling, and idle/suspend between turns. You control: initialization, * model/tool selection, persistence, and any custom per-turn logic. * + * Call from inside a `chat.customAgent()` run — the wrapper binds the + * backing Session that the iterator's stop signal and message channels + * resolve to. (A plain `task()` does not bind it, so `createSession` + * would throw "session handle is not initialized".) + * * @example * ```ts - * import { task } from "@trigger.dev/sdk"; * import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; * import { streamText } from "ai"; * import { openai } from "@ai-sdk/openai"; * - * export const myChat = task({ + * export const myChat = chat.customAgent({ * id: "my-chat", * run: async (payload: ChatTaskWirePayload, { signal }) => { * const session = chat.createSession(payload, { signal }); @@ -8979,13 +9050,23 @@ function createChatSession( [Symbol.asyncIterator]() { let currentPayload = payload; let turn = -1; - const stop = createStopSignal(); + // Created on the first next() call, AFTER the resume-cursor seed — + // createStopSignal attaches the `.in` SSE tail, and attaching + // before the seed replays every record from seq 0 (the seed is a + // no-op when the chatCustomAgent wrapper already ran it). + let stop!: ReturnType; + let booted = false; const accumulator = new ChatMessageAccumulator(); let previousTurnUsage: LanguageModelUsage | undefined; let cumulativeUsage: LanguageModelUsage = emptyUsage(); return { async next(): Promise> { + if (!booted) { + booted = true; + await seedSessionInResumeCursorForCustomLoop(currentPayload); + stop = createStopSignal(); + } turn++; // First turn: wait when the boot payload carries no message. @@ -9328,7 +9409,8 @@ function createChatSession( }, async return() { - stop.cleanup(); + // `stop` only exists once next() has booted the iterator. + stop?.cleanup(); return { done: true, value: undefined }; }, };