Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.

# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
# run_until_complete behaves exactly like the old indefinite wait_condition
# by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it also recycles
# event history via continue-as-new before Temporal's history limit. This
# agent keeps no cross-turn conversation state, so the only thing carried
# forward across a recycle is `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ def __init__(self):
self._complete_task = False
self._state: StateModel | None = None

async def _rehydrate_state(self, task_id: str) -> None:
"""Seed in-memory state, rebuilding the conversation from adk.messages.

Messages are the source of truth and live in adk.messages — NOT in workflow
state. On a brand-new task the ledger is empty (fresh start); on a continued
run (after continue-as-new) we reconstruct input_list from the ledger so the
recycle is invisible to the user. ``conversation_from_messages`` returns []
when continue-as-new is disabled, so the default path is unchanged.
"""
conversation = await self.conversation_from_messages(task_id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(input_list=conversation, turn_number=turn_number)

@workflow.signal(name=SignalName.RECEIVE_EVENT)
@override
async def on_task_event_send(self, params: SendEventParams) -> None:
Expand Down Expand Up @@ -253,24 +266,29 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
if span and self._state:
span.output = self._state.model_dump()

# NOTE: we do NOT persist the conversation to workflow state or adk.state.
# Every user + agent message is already written to the adk.messages ledger
# (the user echo above + the agent auto-send), which is the source of truth.
# On a continue-as-new recycle we rebuild input_list from that ledger in
# _rehydrate_state — messages live in adk.messages, state in adk.state.

@workflow.run
@override
async def on_task_create(self, params: CreateTaskParams) -> None:
logger.info(f"Received task create params: {params}")

# 1. Initialize the state. You can either do this here or in the __init__ method.
# This function is triggered whenever a client creates a task for this agent.
# It is not re-triggered when a new event is sent to the task.
self._state = StateModel(
input_list=[],
turn_number=0,
)

# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.

# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.

await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
# 1. Initialize (or, on a continued run, re-hydrate) the state. This
# function runs both when a client first creates the task AND when the
# workflow recycles itself via continue-as-new — on the latter the
# in-memory state was reset, so we reload it from the adk.messages ledger.
await self._rehydrate_state(params.task.id)

# 2. Keep the workflow open to field events. Temporal can run hundreds of
# millions of workflows in parallel, so staying open is cheap — but a single
# ever-open run accumulates event history until it hits Temporal's ~50k-event
# / 50MB limit. run_until_complete keeps the workflow open exactly like the
# old indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
# recycles history via continue-as-new before that limit. The conversation
# is rebuilt from adk.messages on each run, so the carry-forward is just
# `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,16 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
async def on_task_create(self, params: CreateTaskParams) -> None:
logger.info(f"Received task create params: {params}")

# 1. Initialize the state. You can either do this here or in the
# __init__ method. This function is triggered whenever a client
# creates a task for this agent. It is not re-triggered when a new
# event is sent to the task.
self._state = StateModel(
input_list=[],
turn_number=0,
)
# 1. Initialize (or, on a continued run, re-hydrate) the state. Messages
# are the source of truth and live in adk.messages — NOT in workflow
# state. On a brand-new task the ledger is empty (fresh start); on a
# continued run (after continue-as-new) we reconstruct input_list from
# the ledger so the recycle is invisible to the user.
# conversation_from_messages returns [] when continue-as-new is disabled,
# so the default path is unchanged.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(input_list=conversation, turn_number=turn_number)

# 2. Wait for the task to be completed indefinitely. If we don't do
# this the workflow will close as soon as this function returns.
Expand All @@ -472,10 +474,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
# Thus, if you want this agent to field events indefinitely (or for
# a long time) you need to wait for a condition to be met.

await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task
# from running indefinitely. Generally this is not needed.
# Temporal can run hundreds of millions of workflows in parallel
# and more. Only do this if you have a specific reason to do so.
)
# run_until_complete keeps the workflow open exactly like the old
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
# recycles event history via continue-as-new before Temporal's
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
# on each run, so the carry-forward is just `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,16 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
# ============================================================================
# WORKFLOW INITIALIZATION: Initialize State
# ============================================================================
# Initialize the conversation state with an empty history
# This will be populated as the conversation progresses
self._state = StateModel(
input_list=[],
turn_number=0,
)
# Initialize (or, on a continued run, re-hydrate) the conversation state.
# Messages are the source of truth and live in adk.messages — NOT in
# workflow state. On a brand-new task the ledger is empty (fresh start);
# on a continued run (after continue-as-new) we reconstruct input_list
# from the ledger so the recycle is invisible to the user.
# conversation_from_messages returns [] when continue-as-new is disabled,
# so the default path is unchanged.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(input_list=conversation, turn_number=turn_number)

# ============================================================================
# WORKFLOW INITIALIZATION: Send Welcome Message
Expand Down Expand Up @@ -294,10 +298,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
# - Temporal can handle millions of such concurrent workflows
# - If worker crashes, workflow resumes exactly where it left off
# - All conversation state is preserved in Temporal's event log
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # No timeout = truly long-running agent conversation
)
# run_until_complete keeps the workflow open exactly like the old
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
# recycles event history via continue-as-new before Temporal's
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
# on each run, so the carry-forward is just `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
return "Agent conversation completed"

@workflow.signal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,17 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
async def on_task_create(self, params: CreateTaskParams) -> str:
logger.info(f"Received task create params: {params}")

# Initialize the conversation state with an empty history
# Initialize (or, on a continued run, re-hydrate) the conversation state.
# This runs both when a client first creates the task AND when the workflow
# recycles itself via continue-as-new — on the latter the in-memory state was
# reset, so we rebuild input_list from the adk.messages ledger (the source of
# truth). conversation_from_messages returns [] when continue-as-new is
# disabled, so the default path is an empty fresh start as before.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(
input_list=[],
turn_number=0,
input_list=conversation,
turn_number=turn_number,
)

# 1. Acknowledge that the task has been created.
Expand All @@ -346,10 +353,13 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
),
)

await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
# Keep the workflow open to field events. run_until_complete behaves exactly
# like the old indefinite wait_condition, and (when
# WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history via
# continue-as-new before Temporal's ~50k-event / 50MB limit. Messages live in
# adk.messages and are rebuilt above, so the only state carried forward is
# params.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
return "Task completed"

@workflow.signal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,24 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
"""
logger.info(f"Received task create params: {params}")

# Initialize the conversation state with an empty history
# Initialize (or, on a continued run, re-hydrate) the conversation state.
# This runs both when a client first creates the task AND when the workflow
# recycles itself via continue-as-new — on the latter the in-memory state was
# reset, so we rebuild input_list from the adk.messages ledger (the source of
# truth). conversation_from_messages returns [] when continue-as-new is
# disabled, so the default path is an empty fresh start as before.
#
# The only cross-turn parent state here is input_list/turn_number; the
# human-approval state lives entirely in the child workflow spawned by
# wait_for_confirmation (it waits for fulfill_order_signal), and an in-flight
# approval turn keeps on_task_event_send running — run_until_complete drains
# all in-flight handlers before recycling, so it is never cut off. Hence
# conversation_from_messages alone is enough; no adk.state is needed.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(
input_list=[],
turn_number=0,
input_list=conversation,
turn_number=turn_number,
)

# Send welcome message when task is created
Expand All @@ -231,12 +245,13 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
),
)

# Keep workflow running indefinitely to handle user messages and human approvals
# This survives system failures and can resume exactly where it left off
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # No timeout for long-running human-in-the-loop workflows
)
# Keep the workflow open to handle user messages and human approvals.
# run_until_complete behaves exactly like the old indefinite wait_condition,
# and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history
# via continue-as-new before Temporal's ~50k-event / 50MB limit, draining any
# in-flight approval turn first. Messages live in adk.messages and are rebuilt
# above, so the only state carried forward is params.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
return "Task completed"

# TEMPORAL UI (localhost:8080):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,64 @@ def __init__(self):
self._trace_id = None
self._parent_span_id = None
self._workspace_path = None
# adk.state row id for the opaque session handle (continue-as-new only).
self._state_id: str | None = None

async def _rehydrate_state(self, task_id: str) -> None:
"""Initialize state, restoring the Claude session handle on a continued run.

``claude_session_id`` is an opaque CLI handle that maintains conversation
context — it CANNOT be rebuilt from adk.messages, so to survive a
continue-as-new recycle it is persisted to adk.state (the sanctioned home
for non-message state). When continue-as-new is disabled (the default) this
makes ZERO extra activity calls and behaves exactly like the old fresh init.
"""
if self._continue_as_new_enabled and environment_variables.AGENT_ID:
existing = await adk.state.get_by_task_and_agent(
task_id=task_id,
agent_id=environment_variables.AGENT_ID,
)
if existing is not None:
self._state_id = existing.id
self._state = StateModel(
claude_session_id=existing.state.get("claude_session_id"),
turn_number=existing.state.get("turn_number", 0),
)
return
# Fresh start (default path).
self._state = StateModel(
claude_session_id=None,
turn_number=0,
)

async def _persist_state(self, task_id: str) -> None:
"""Persist the opaque Claude session handle + turn number to adk.state.

Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra
activity calls. Creates the row on first write, updates it thereafter.
"""
if not (self._continue_as_new_enabled and environment_variables.AGENT_ID):
return
if self._state is None:
return
state_payload = {
"claude_session_id": self._state.claude_session_id,
"turn_number": self._state.turn_number,
}
if self._state_id is None:
created = await adk.state.create(
task_id=task_id,
agent_id=environment_variables.AGENT_ID,
state=state_payload,
)
self._state_id = created.id
else:
await adk.state.update(
state_id=self._state_id,
task_id=task_id,
agent_id=environment_variables.AGENT_ID,
state=state_payload,
)

@workflow.signal(name=SignalName.RECEIVE_EVENT)
async def on_task_event_send(self, params: SendEventParams):
Expand Down Expand Up @@ -171,6 +229,10 @@ async def on_task_event_send(self, params: SendEventParams):
# Response already streamed to UI by activity - no need to send again
logger.debug(f"Turn {self._state.turn_number} completed successfully")

# Persist the opaque session handle so it survives a
# continue-as-new recycle (no-op when the feature is disabled).
await self._persist_state(params.task.id)

except Exception as e:
logger.error(f"Error running Claude agent: {e}", exc_info=True)
# Send error message to user
Expand All @@ -189,11 +251,9 @@ async def on_task_create(self, params: CreateTaskParams):

logger.info(f"Creating Claude MVP workflow for task: {params.task.id}")

# Initialize state with session tracking
self._state = StateModel(
claude_session_id=None,
turn_number=0,
)
# Initialize state, or (on a continue-as-new recycle) re-hydrate the
# opaque Claude session handle from adk.state. Default path is a fresh init.
await self._rehydrate_state(params.task.id)

# Create workspace via activity (avoids determinism issues with file I/O)
workspace_root = os.environ.get("CLAUDE_WORKSPACE_ROOT")
Expand Down Expand Up @@ -223,12 +283,12 @@ async def on_task_create(self, params: CreateTaskParams):
)
)

# Wait for completion signal
# Keep the workflow open to field events. run_until_complete behaves
# exactly like the old indefinite wait unless continue-as-new is enabled,
# in which case it recycles event history before Temporal's limit. The
# opaque session handle is carried across recycles via adk.state.
logger.info("Waiting for task completion...")
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Long-running workflow
)
await self.run_until_complete(params, is_complete=lambda: self._complete_task)

logger.info("Claude MVP workflow completed")
return "Task completed successfully"
Expand Down
Loading
Loading