Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions projects/ai-chat/src/app/api/chat-hydrated/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* chat.headStart first-turn endpoint for the `ai-chat-hydrated` agent.
*
* Same shape as `/api/chat` (see the header comment there for the
* full head-start mechanics) but handing over to the hydrateMessages
* agent. This is the smoke-test surface for the headStart ×
* hydrateMessages combination: the agent owns history via its DB-backed
* `hydrateMessages` hook, and the turn-0 handover splice must still
* deliver the warm handler's step-1 partial (text, tool calls, stable
* messageId) into the agent's accumulator.
*/
import { chat } from "@trigger.dev/sdk/chat-server";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";
// ⚠️ Imports MUST come from `chat-tools-schemas` only — see the
// header comment in that file for the bundle-isolation rationale.
import { headStartTools } from "@/lib/chat-tools-schemas";

export const POST = chat.headStart({
agentId: "ai-chat-hydrated",
run: async ({ chat: chatHelper }) => {
return streamText({
...chatHelper.toStreamTextOptions({ tools: headStartTools }),
model: anthropic("claude-sonnet-4-6"),
system:
"You are a helpful AI assistant. Be concise and friendly. Use the available tools when relevant.",
// Extended thinking so head-start smoke tests cover reasoning
// parts surviving the handover into durable history.
providerOptions: {
anthropic: { thinking: { type: "enabled", budgetTokens: 2048 } },
},
});
},
});
8 changes: 7 additions & 1 deletion projects/ai-chat/src/components/chat-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ export function ChatView({
// session state from response headers and writes directly to
// `session.in` for turn 2 onward — same direct-trigger path as
// when `headStart` is unset.
headStart: useHandover ? "/api/chat" : undefined,
// The hydrated agent has its own head-start route so the
// headStart × hydrateMessages combination is testable end-to-end.
headStart: useHandover
? taskMode === "ai-chat-hydrated"
? "/api/chat-hydrated"
: "/api/chat"
: undefined,
});

const handleFirstMessage = useCallback(
Expand Down
68 changes: 58 additions & 10 deletions projects/ai-chat/src/trigger/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -674,14 +674,25 @@ export const aiChatRaw = chat.customAgent({
},
});

// Continuation boots (fresh run) start with an empty accumulator —
// fetch stored history so turn 0 can seed it. Seeding must go THROUGH
// addIncoming: turn 0 replaces the accumulator, so a setMessages
// before the loop would be wiped.
let continuationSeed: UIMessage[] = [];
if (currentPayload.continuation) {
const row = await prisma.chat.findUnique({ where: { id: currentPayload.chatId } });
continuationSeed = (row?.messages ?? []) as unknown as UIMessage[];
}

for (let turn = 0; turn < 100; turn++) {
stop.reset();

const messages = await conversation.addIncoming(
currentPayload.message ? [currentPayload.message] : [],
currentPayload.trigger,
turn
);
const incoming = currentPayload.message ? [currentPayload.message] : [];
const turnInput =
turn === 0 && continuationSeed.length > 0
? [...continuationSeed.filter((s) => !incoming.some((m) => m.id === s.id)), ...incoming]
: incoming;
const messages = await conversation.addIncoming(turnInput, currentPayload.trigger, turn);

const turnClientData = (currentPayload.metadata ?? currentClientData) as
| { userId: string; model?: string }
Expand All @@ -696,6 +707,13 @@ export const aiChatRaw = chat.customAgent({
const useReasoning = useExtendedThinking(modelOverride);
const combinedSignal = AbortSignal.any([runSignal, stop.signal]);

// Persist the incoming user message BEFORE streaming (the
// onTurnStart-equivalent) so a mid-stream reload doesn't lose it.
await prisma.chat.update({
where: { id: currentPayload.chatId },
data: { messages: conversation.uiMessages as unknown as ChatMessagesForWrite },
});

const steeringSub = chat.messages.on(async (msg) => {
if (msg.message) await conversation.steerAsync(msg.message);
});
Expand Down Expand Up @@ -744,9 +762,13 @@ export const aiChatRaw = chat.customAgent({

if (runSignal.aborted) break;

// Race with a timeout — on stop-abort totalUsage never settles.
let turnUsage: LanguageModelUsage | undefined;
try {
turnUsage = await result.totalUsage;
turnUsage = await Promise.race([
result.totalUsage,
new Promise<undefined>((r) => setTimeout(() => r(undefined), 2000)),
]);
} catch {
/* non-fatal */
}
Expand Down Expand Up @@ -827,6 +849,17 @@ export const aiChatSession = chat
});

for await (const turn of session) {
// Continuation boots (fresh run) start with an empty accumulator —
// seed it from the DB, keeping any incoming message not yet persisted.
if (turn.continuation && turn.number === 0) {
const row = await prisma.chat.findUnique({ where: { id: turn.chatId } });
const stored = (row?.messages ?? []) as unknown as UIMessage[];
if (stored.length > 0) {
const incoming = turn.uiMessages.filter((m) => !stored.some((s) => s.id === m.id));
await turn.setMessages([...stored, ...incoming]);
}
}

const turnClientData = (turn.clientData ?? clientData) as
| { userId: string; model?: string }
| undefined;
Expand All @@ -837,6 +870,13 @@ export const aiChatSession = chat
const modelOverride = turnClientData?.model ?? userContext.preferredModel ?? undefined;
const useReasoning = useExtendedThinking(modelOverride);

// Persist the incoming user message BEFORE streaming (the
// onTurnStart-equivalent) so a mid-stream reload doesn't lose it.
await prisma.chat.update({
where: { id: turn.chatId },
data: { messages: turn.uiMessages as unknown as ChatMessagesForWrite },
});

const result = streamText({
...chat.toStreamTextOptions({ registry }),
model: languageModelForChatTurn(modelOverride),
Expand Down Expand Up @@ -904,6 +944,10 @@ export const aiChatHydrated = chat
id: "ai-chat-hydrated",
idleTimeoutInSeconds: 60,

// Same tool set as `ai-chat` so head-start tool-call handovers can
// execute post-handover (see /api/chat-hydrated).
tools: chatTools,

// Load message history from the database on every turn.
// The frontend's accumulated messages are ignored — the DB is the
// single source of truth. `upsertIncomingMessage` handles HITL
Expand All @@ -915,9 +959,12 @@ export const aiChatHydrated = chat
const stored = (record?.messages as unknown as UIMessage[]) ?? [];

if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
await prisma.chat.update({
// Upsert, not update: on a head-start first turn there's no
// preload, so the row may not exist yet when the hook fires.
await prisma.chat.upsert({
where: { id: chatId },
data: { messages: stored as unknown as ChatMessagesForWrite },
create: { id: chatId, title: "New chat", messages: stored as unknown as ChatMessagesForWrite },
update: { messages: stored as unknown as ChatMessagesForWrite },
});
}

Expand Down Expand Up @@ -1007,13 +1054,14 @@ export const aiChatHydrated = chat
]);
},

run: async ({ messages, clientData, stopSignal }) => {
run: async ({ messages, clientData, stopSignal, tools }) => {
return streamText({
...chat.toStreamTextOptions(),
...chat.toStreamTextOptions({ tools }),
model: languageModelForChatTurn(
clientData?.model ?? userContext.preferredModel ?? undefined
),
messages,
stopWhen: stepCountIs(10),
abortSignal: stopSignal,
});
},
Expand Down