Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/custom-agent-loop-fixes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@trigger.dev/sdk": patch
---

Three fixes for custom agent loops (`chat.customAgent`, `chat.createSession`, and hand-rolled `MessageAccumulator` loops):

- Continuation runs no longer replay already-answered user messages into the first turn. The `.in` resume cursor is now seeded before any listener attaches (the same boot logic `chat.agent` uses), so a chat that continues after a cancel, crash, or upgrade only sees genuinely new messages.
- Steering a hand-rolled loop mid-stream no longer wipes the in-flight assistant response. `chat.pipeAndCapture` now stamps a server-generated message id on the stream, so a `prepareStep` injection keeps the partial text instead of replacing the message.
- Task-backed tools (`ai.toolExecute`) now work from custom agent loops: the parent's session is threaded to the child run, so child tasks can stream progress into the chat with `chat.stream.writer({ target: "root" })` instead of failing with "session handle is not initialized".
78 changes: 75 additions & 3 deletions packages/trigger-sdk/src/v3/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,47 @@ export async function __findLatestSessionInCursorForTests(
return findLatestSessionInCursor(chatId);
}

/**
* Seed the `.in` resume cursor for custom-agent loops (`chat.customAgent`
* raw loops and `chat.createSession`) the way `chat.agent`'s boot does.
*
* MUST run before anything attaches a `.in` listener (`createStopSignal`,
* `chat.messages.on`, the first wait): attaching opens the SSE tail with
* `Last-Event-ID` from the seeded cursor, so attach-then-seed replays
* every record from seq 0 — already-answered user messages get delivered
* into the new run's first wait and the loop re-answers them.
*
* Seeds both cursors: `setLastSeqNum` controls the SSE `Last-Event-ID`,
* `setLastDispatchedSeqNum` gates waiter dispatch — seeding only the
* former still re-delivers records the manager buffered before the seed.
*
* No-ops on fresh boots and when a cursor is already seeded (e.g. the
* `chatCustomAgent` wrapper ran before a nested `createChatSession`).
* @internal
*/
async function seedSessionInResumeCursorForCustomLoop(
payload: Pick<ChatTaskWirePayload, "chatId" | "continuation">
): Promise<void> {
if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return;
// No continuation/attempt gate: the wire may omit `continuation` on a
// run that still has prior turns (chat.agent covers that case via its
// snapshot). The scan doubles as the prior-state probe — a fresh
// session has no turn-complete on `.out`, returns no cursor, and
// seeds nothing. Cost on fresh boots is one non-blocking records read.
try {
const cursor = await findLatestSessionInCursor(payload.chatId);
if (cursor !== undefined) {
sessionStreams.setLastSeqNum(payload.chatId, "in", cursor);
sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor);
}
} catch (error) {
logger.warn(
"chat session: session.in resume cursor lookup failed; old messages may replay",
{ error: error instanceof Error ? error.message : String(error) }
);
}
}

/**
* Versioned blob written to S3 after every turn completes (when no
* `hydrateMessages` hook is registered). Read at run boot to seed the
Expand Down Expand Up @@ -921,6 +962,15 @@ function createTaskToolExecuteHandler<
toolMeta.turn = chatCtx.turn;
toolMeta.continuation = chatCtx.continuation;
toolMeta.clientData = chatCtx.clientData;
} else {
// Hand-rolled chat.customAgent loops never set per-turn context, but
// the wrapper binds the session handle at run boot — thread the
// chatId from it so subtask chat helpers (`chat.stream.writer`
// with target "root") can open the parent's session.
const sessionHandle = locals.get(chatSessionHandleKey);
if (sessionHandle) {
toolMeta.chatId = sessionHandle.id;
}
Comment on lines +965 to +973

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Preserve the external chatId contract here.

Line 972 copies sessionHandle.id into toolMeta.chatId. In this file, getChatSession().id is also exposed as chat.sessionId and documented as the Session friendlyId (session_*), while ToolCallExecutionOptions.chatId / ai.chatContext() are documented as the chat's external chatId. That means custom-loop subtasks will surface/store a different identifier than normal chat.agent turns. Persist the boot payload's chatId in locals and use that here instead of the Session handle id.

🛠️ Suggested direction
+const chatExternalIdKey = locals.create<string>("chat.externalId");
...
 locals.set(chatSessionHandleKey, sessions.open(payload.chatId));
+locals.set(chatExternalIdKey, payload.chatId);
...
-      const sessionHandle = locals.get(chatSessionHandleKey);
-      if (sessionHandle) {
-        toolMeta.chatId = sessionHandle.id;
+      const chatId = locals.get(chatExternalIdKey);
+      if (chatId) {
+        toolMeta.chatId = chatId;
       }

}

const chatLocals: Record<string, unknown> = {};
Expand Down Expand Up @@ -5113,6 +5163,10 @@ function chatCustomAgent<
markChatAgentRunForStreamsWarning();
taskContext.setConversationId(payload.chatId);
stampConversationIdOnActiveSpan(payload.chatId);
// Seed the `.in` resume cursor before user code attaches any `.in`
// listener — otherwise a continuation boot replays already-answered
// messages into the loop's first wait.
await seedSessionInResumeCursorForCustomLoop(payload);
return userRun(payload, runOptions);
},
});
Expand Down Expand Up @@ -8613,8 +8667,15 @@ async function pipeChatAndCapture(
resolveOnFinish = r;
});

const resolvedOptions = resolveUIMessageStreamOptions();
const uiStream = source.toUIMessageStream({
...resolveUIMessageStreamOptions(),
...resolvedOptions,
// Stamp a server-generated id on the start chunk, same as chat.agent's
// pipe. Without it the AI SDK regenerates the assistant id when a
// prepareStep injection (steering) starts a new step mid-stream, and
// the frontend replaces the partial message — wiping the
// pre-injection text from the UI and the captured response.
generateMessageId: resolvedOptions.generateMessageId ?? generateMessageId,
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
captured = responseMessage;
resolveOnFinish!();
Expand Down Expand Up @@ -8979,13 +9040,23 @@ function createChatSession(
[Symbol.asyncIterator]() {
let currentPayload = payload;
let turn = -1;
const stop = createStopSignal();
// Created on the first next() call, AFTER the resume-cursor seed —
// createStopSignal attaches the `.in` SSE tail, and attaching
// before the seed replays every record from seq 0 (the seed is a
// no-op when the chatCustomAgent wrapper already ran it).
let stop!: ReturnType<typeof createStopSignal>;
let booted = false;
const accumulator = new ChatMessageAccumulator();
let previousTurnUsage: LanguageModelUsage | undefined;
let cumulativeUsage: LanguageModelUsage = emptyUsage();

return {
async next(): Promise<IteratorResult<ChatTurn>> {
if (!booted) {
booted = true;
await seedSessionInResumeCursorForCustomLoop(currentPayload);
stop = createStopSignal();
}
turn++;

// First turn: wait when the boot payload carries no message.
Expand Down Expand Up @@ -9328,7 +9399,8 @@ function createChatSession(
},

async return() {
stop.cleanup();
// `stop` only exists once next() has booted the iterator.
stop?.cleanup();
return { done: true, value: undefined };
},
};
Expand Down
Loading