From 891ef6df65d0bebe1ad7aeeb327e6a1a3e827ec5 Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Wed, 24 Jun 2026 16:18:21 -0400 Subject: [PATCH] feat(temporal): opt-in continue-as-new for long-lived agent workflows Long-lived chat/session agents run as a single Temporal workflow that stays open indefinitely, so their event history grows until it hits Temporal's ~50k-event / 50MB limit and the workflow stalls. This adds an opt-in continue-as-new path that recycles the history so a session can stay open forever, plus the discipline of keeping messages/state outside workflow state so they survive the recycle. SDK (BaseWorkflow): - should_continue_as_new(): recycle decision (Temporal's is_continue_as_new_ suggested() or a configurable WORKFLOW_MAX_HISTORY_LENGTH threshold). - drain_and_continue_as_new(): waits all_handlers_finished (so an in-flight turn is never lost/duplicated at the boundary) then continue_as_new. - run_until_complete(): drop-in replacement for the usual wait_condition(timeout=None) tail; gated once behind workflow.patched() so in-flight pre-patch workflows keep the old behaviour (no non-determinism on replay). Identical behaviour unless WORKFLOW_CONTINUE_AS_NEW_ENABLED is set. - conversation_from_messages(): rebuild the conversation from the adk.messages ledger after a recycle (messages live in adk.messages, not workflow state). Config (default off, so existing agents are unaffected): - WORKFLOW_CONTINUE_AS_NEW_ENABLED (bool) - WORKFLOW_MAX_HISTORY_LENGTH (int|None) Examples: all 13 long-lived Temporal tutorial agents adopt run_until_complete. Message-based chat agents rebuild conversation from adk.messages; harness agents with an opaque session handle (claude-code, codex, claude-sdk) or rich history (pydantic-ai via ModelMessagesTypeAdapter, langgraph) persist their non-message state to adk.state and re-hydrate on recycle. Every adk.state / adk.messages round-trip is guarded by the enabled flag, so the default path is byte-for-byte unchanged. Note: continue-as-new bounds history SIZE; it does NOT extend the chain-wide WORKFLOW_EXECUTION_TIMEOUT_SECONDS (raise that to keep workflows long-lived). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../000_hello_acp/project/workflow.py | 10 +- .../010_agent_chat/project/workflow.py | 50 ++++-- .../project/workflow.py | 31 ++-- .../project/workflow.py | 26 +-- .../project/workflow.py | 24 ++- .../project/workflow.py | 33 +++- .../project/workflow.py | 80 +++++++-- .../100_gemini_litellm/project/workflow.py | 26 +-- .../110_pydantic_ai/project/workflow.py | 93 +++++++++- .../120_openai_agents/project/workflow.py | 16 +- .../130_langgraph/project/workflow.py | 13 +- .../140_claude_code/project/workflow.py | 58 ++++++- .../10_temporal/150_codex/project/workflow.py | 68 +++++++- .../10_temporal/150_codex/tests/test_agent.py | 5 + .../lib/core/temporal/workflows/workflow.py | 163 ++++++++++++++++++ src/agentex/lib/environment_variables.py | 13 ++ .../test_base_workflow_continue_as_new.py | 94 ++++++++++ uv.lock | 4 +- 18 files changed, 714 insertions(+), 93 deletions(-) create mode 100644 tests/lib/core/temporal/test_base_workflow_continue_as_new.py diff --git a/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py b/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py index 2ca0858ba..ccac5dfe7 100644 --- a/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py @@ -65,7 +65,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None: # 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once. # Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met. - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. - ) + # run_until_complete behaves exactly like the old indefinite wait_condition + # by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it also recycles + # event history via continue-as-new before Temporal's history limit. This + # agent keeps no cross-turn conversation state, so the only thing carried + # forward across a recycle is `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) diff --git a/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py b/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py index 3e3ac5b27..dd2229713 100644 --- a/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py @@ -160,6 +160,19 @@ def __init__(self): self._complete_task = False self._state: StateModel | None = None + async def _rehydrate_state(self, task_id: str) -> None: + """Seed in-memory state, rebuilding the conversation from adk.messages. + + Messages are the source of truth and live in adk.messages — NOT in workflow + state. On a brand-new task the ledger is empty (fresh start); on a continued + run (after continue-as-new) we reconstruct input_list from the ledger so the + recycle is invisible to the user. ``conversation_from_messages`` returns [] + when continue-as-new is disabled, so the default path is unchanged. + """ + conversation = await self.conversation_from_messages(task_id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) + @workflow.signal(name=SignalName.RECEIVE_EVENT) @override async def on_task_event_send(self, params: SendEventParams) -> None: @@ -253,24 +266,29 @@ async def on_task_event_send(self, params: SendEventParams) -> None: if span and self._state: span.output = self._state.model_dump() + # NOTE: we do NOT persist the conversation to workflow state or adk.state. + # Every user + agent message is already written to the adk.messages ledger + # (the user echo above + the agent auto-send), which is the source of truth. + # On a continue-as-new recycle we rebuild input_list from that ledger in + # _rehydrate_state — messages live in adk.messages, state in adk.state. + @workflow.run @override async def on_task_create(self, params: CreateTaskParams) -> None: logger.info(f"Received task create params: {params}") - # 1. Initialize the state. You can either do this here or in the __init__ method. - # This function is triggered whenever a client creates a task for this agent. - # It is not re-triggered when a new event is sent to the task. - self._state = StateModel( - input_list=[], - turn_number=0, - ) - - # 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once. - - # Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met. - - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. - ) + # 1. Initialize (or, on a continued run, re-hydrate) the state. This + # function runs both when a client first creates the task AND when the + # workflow recycles itself via continue-as-new — on the latter the + # in-memory state was reset, so we reload it from the adk.messages ledger. + await self._rehydrate_state(params.task.id) + + # 2. Keep the workflow open to field events. Temporal can run hundreds of + # millions of workflows in parallel, so staying open is cheap — but a single + # ever-open run accumulates event history until it hits Temporal's ~50k-event + # / 50MB limit. run_until_complete keeps the workflow open exactly like the + # old indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles history via continue-as-new before that limit. The conversation + # is rebuilt from adk.messages on each run, so the carry-forward is just + # `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) diff --git a/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py b/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py index b54c8fade..49fc403e7 100644 --- a/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py @@ -455,14 +455,16 @@ async def on_task_event_send(self, params: SendEventParams) -> None: async def on_task_create(self, params: CreateTaskParams) -> None: logger.info(f"Received task create params: {params}") - # 1. Initialize the state. You can either do this here or in the - # __init__ method. This function is triggered whenever a client - # creates a task for this agent. It is not re-triggered when a new - # event is sent to the task. - self._state = StateModel( - input_list=[], - turn_number=0, - ) + # 1. Initialize (or, on a continued run, re-hydrate) the state. Messages + # are the source of truth and live in adk.messages — NOT in workflow + # state. On a brand-new task the ledger is empty (fresh start); on a + # continued run (after continue-as-new) we reconstruct input_list from + # the ledger so the recycle is invisible to the user. + # conversation_from_messages returns [] when continue-as-new is disabled, + # so the default path is unchanged. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) # 2. Wait for the task to be completed indefinitely. If we don't do # this the workflow will close as soon as this function returns. @@ -472,10 +474,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None: # Thus, if you want this agent to field events indefinitely (or for # a long time) you need to wait for a condition to be met. - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task - # from running indefinitely. Generally this is not needed. - # Temporal can run hundreds of millions of workflows in parallel - # and more. Only do this if you have a specific reason to do so. - ) + # run_until_complete keeps the workflow open exactly like the old + # indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles event history via continue-as-new before Temporal's + # ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages + # on each run, so the carry-forward is just `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py index e01f40ce6..8381108e3 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py @@ -258,12 +258,16 @@ async def on_task_create(self, params: CreateTaskParams) -> str: # ============================================================================ # WORKFLOW INITIALIZATION: Initialize State # ============================================================================ - # Initialize the conversation state with an empty history - # This will be populated as the conversation progresses - self._state = StateModel( - input_list=[], - turn_number=0, - ) + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # Messages are the source of truth and live in adk.messages — NOT in + # workflow state. On a brand-new task the ledger is empty (fresh start); + # on a continued run (after continue-as-new) we reconstruct input_list + # from the ledger so the recycle is invisible to the user. + # conversation_from_messages returns [] when continue-as-new is disabled, + # so the default path is unchanged. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) # ============================================================================ # WORKFLOW INITIALIZATION: Send Welcome Message @@ -294,10 +298,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str: # - Temporal can handle millions of such concurrent workflows # - If worker crashes, workflow resumes exactly where it left off # - All conversation state is preserved in Temporal's event log - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # No timeout = truly long-running agent conversation - ) + # run_until_complete keeps the workflow open exactly like the old + # indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles event history via continue-as-new before Temporal's + # ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages + # on each run, so the carry-forward is just `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Agent conversation completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py index 2204d3a05..5ed2be84b 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py @@ -331,10 +331,17 @@ async def on_task_event_send(self, params: SendEventParams) -> None: async def on_task_create(self, params: CreateTaskParams) -> str: logger.info(f"Received task create params: {params}") - # Initialize the conversation state with an empty history + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # This runs both when a client first creates the task AND when the workflow + # recycles itself via continue-as-new — on the latter the in-memory state was + # reset, so we rebuild input_list from the adk.messages ledger (the source of + # truth). conversation_from_messages returns [] when continue-as-new is + # disabled, so the default path is an empty fresh start as before. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") self._state = StateModel( - input_list=[], - turn_number=0, + input_list=conversation, + turn_number=turn_number, ) # 1. Acknowledge that the task has been created. @@ -346,10 +353,13 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ) - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. - ) + # Keep the workflow open to field events. run_until_complete behaves exactly + # like the old indefinite wait_condition, and (when + # WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history via + # continue-as-new before Temporal's ~50k-event / 50MB limit. Messages live in + # adk.messages and are rebuilt above, so the only state carried forward is + # params. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py index 4f11ac4c0..3195a0f47 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py @@ -216,10 +216,24 @@ async def on_task_create(self, params: CreateTaskParams) -> str: """ logger.info(f"Received task create params: {params}") - # Initialize the conversation state with an empty history + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # This runs both when a client first creates the task AND when the workflow + # recycles itself via continue-as-new — on the latter the in-memory state was + # reset, so we rebuild input_list from the adk.messages ledger (the source of + # truth). conversation_from_messages returns [] when continue-as-new is + # disabled, so the default path is an empty fresh start as before. + # + # The only cross-turn parent state here is input_list/turn_number; the + # human-approval state lives entirely in the child workflow spawned by + # wait_for_confirmation (it waits for fulfill_order_signal), and an in-flight + # approval turn keeps on_task_event_send running — run_until_complete drains + # all in-flight handlers before recycling, so it is never cut off. Hence + # conversation_from_messages alone is enough; no adk.state is needed. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") self._state = StateModel( - input_list=[], - turn_number=0, + input_list=conversation, + turn_number=turn_number, ) # Send welcome message when task is created @@ -231,12 +245,13 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ) - # Keep workflow running indefinitely to handle user messages and human approvals - # This survives system failures and can resume exactly where it left off - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # No timeout for long-running human-in-the-loop workflows - ) + # Keep the workflow open to handle user messages and human approvals. + # run_until_complete behaves exactly like the old indefinite wait_condition, + # and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history + # via continue-as-new before Temporal's ~50k-event / 50MB limit, draining any + # in-flight approval turn first. Messages live in adk.messages and are rebuilt + # above, so the only state carried forward is params. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" # TEMPORAL UI (localhost:8080): diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py index c22045152..d978ada1d 100644 --- a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py @@ -86,6 +86,64 @@ def __init__(self): self._trace_id = None self._parent_span_id = None self._workspace_path = None + # adk.state row id for the opaque session handle (continue-as-new only). + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Initialize state, restoring the Claude session handle on a continued run. + + ``claude_session_id`` is an opaque CLI handle that maintains conversation + context — it CANNOT be rebuilt from adk.messages, so to survive a + continue-as-new recycle it is persisted to adk.state (the sanctioned home + for non-message state). When continue-as-new is disabled (the default) this + makes ZERO extra activity calls and behaves exactly like the old fresh init. + """ + if self._continue_as_new_enabled and environment_variables.AGENT_ID: + existing = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if existing is not None: + self._state_id = existing.id + self._state = StateModel( + claude_session_id=existing.state.get("claude_session_id"), + turn_number=existing.state.get("turn_number", 0), + ) + return + # Fresh start (default path). + self._state = StateModel( + claude_session_id=None, + turn_number=0, + ) + + async def _persist_state(self, task_id: str) -> None: + """Persist the opaque Claude session handle + turn number to adk.state. + + Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra + activity calls. Creates the row on first write, updates it thereafter. + """ + if not (self._continue_as_new_enabled and environment_variables.AGENT_ID): + return + if self._state is None: + return + state_payload = { + "claude_session_id": self._state.claude_session_id, + "turn_number": self._state.turn_number, + } + if self._state_id is None: + created = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) + self._state_id = created.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams): @@ -171,6 +229,10 @@ async def on_task_event_send(self, params: SendEventParams): # Response already streamed to UI by activity - no need to send again logger.debug(f"Turn {self._state.turn_number} completed successfully") + # Persist the opaque session handle so it survives a + # continue-as-new recycle (no-op when the feature is disabled). + await self._persist_state(params.task.id) + except Exception as e: logger.error(f"Error running Claude agent: {e}", exc_info=True) # Send error message to user @@ -189,11 +251,9 @@ async def on_task_create(self, params: CreateTaskParams): logger.info(f"Creating Claude MVP workflow for task: {params.task.id}") - # Initialize state with session tracking - self._state = StateModel( - claude_session_id=None, - turn_number=0, - ) + # Initialize state, or (on a continue-as-new recycle) re-hydrate the + # opaque Claude session handle from adk.state. Default path is a fresh init. + await self._rehydrate_state(params.task.id) # Create workspace via activity (avoids determinism issues with file I/O) workspace_root = os.environ.get("CLAUDE_WORKSPACE_ROOT") @@ -223,12 +283,12 @@ async def on_task_create(self, params: CreateTaskParams): ) ) - # Wait for completion signal + # Keep the workflow open to field events. run_until_complete behaves + # exactly like the old indefinite wait unless continue-as-new is enabled, + # in which case it recycles event history before Temporal's limit. The + # opaque session handle is carried across recycles via adk.state. logger.info("Waiting for task completion...") - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Long-running workflow - ) + await self.run_until_complete(params, is_complete=lambda: self._complete_task) logger.info("Claude MVP workflow completed") return "Task completed successfully" diff --git a/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py b/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py index 249bdaa50..2f9e30f50 100644 --- a/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py @@ -200,11 +200,16 @@ async def on_task_create(self, params: CreateTaskParams) -> str: """ logger.info(f"Received task create params: {params}") - # Initialize the conversation state - self._state = StateModel( - input_list=[], - turn_number=0, - ) + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # Messages are the source of truth and live in adk.messages — NOT in + # workflow state. On a brand-new task the ledger is empty (fresh start); + # on a continued run (after continue-as-new) we reconstruct input_list + # from the ledger so the recycle is invisible to the user. + # conversation_from_messages returns [] when continue-as-new is disabled, + # so the default path is unchanged. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) # Send welcome message await adk.messages.create( @@ -220,11 +225,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ) - # Wait for completion signal - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, - ) + # run_until_complete keeps the workflow open exactly like the old + # indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles event history via continue-as-new before Temporal's + # ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages + # on each run, so the carry-forward is just `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Agent conversation completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py b/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py index 9a01be7de..36b06500b 100644 --- a/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py @@ -9,8 +9,19 @@ Multi-turn memory is kept on the workflow instance itself (``self._message_history``). Temporal's workflow state is already durable and -replay-safe, so unlike the async-base agent we don't need an external +replay-safe, so for crash recovery alone we don't need an external ``adk.state`` round-trip. + +When continue-as-new is enabled (``WORKFLOW_CONTINUE_AS_NEW_ENABLED``), the +workflow may recycle its event history mid-conversation, which resets all +in-memory state. To survive that recycle the conversation must live OUTSIDE +workflow state. Pydantic-ai ``ModelMessage`` objects carry tool rounds and are +not plain text (so the adk.messages ledger can't reconstruct them losslessly), +so we persist them to ``adk.state`` using pydantic-ai's official +``ModelMessagesTypeAdapter`` serializer and re-hydrate on each run. All of this +is gated behind ``self._continue_as_new_enabled`` (and a present ``AGENT_ID``), +so the default-off path makes zero extra activity calls and is byte-for-byte +unchanged. """ from __future__ import annotations @@ -20,6 +31,7 @@ from typing import TYPE_CHECKING from temporalio import workflow +from pydantic_ai.messages import ModelMessagesTypeAdapter from agentex.lib import adk from project.agent import TaskDeps, temporal_agent @@ -77,6 +89,68 @@ def __init__(self): # produced these messages, so the list is rebuilt deterministically if # the workflow ever recovers from a crash. self._message_history: list["ModelMessage"] = [] + # adk.state row id backing the persisted conversation, set lazily on the + # first persist (or on re-hydrate). Only used when continue-as-new is + # enabled; stays None otherwise. + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Reload turn number + message history from adk.state on a continued run. + + Only does anything when continue-as-new is enabled AND an AGENT_ID is + present; otherwise the normal fresh in-memory init stands and no activity + is scheduled. On a brand-new task there is no stored state yet, so this is + a no-op there too. On a run that resumed via continue-as-new the stored + payload is found, ``self._state_id`` is set, ``self._turn_number`` is + restored, and ``self._message_history`` is deserialized losslessly via + ``ModelMessagesTypeAdapter`` (preserving tool rounds). + """ + if not self._continue_as_new_enabled or not environment_variables.AGENT_ID: + return + state = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if state is None: + return + self._state_id = state.id + self._turn_number = state.state.get("turn_number", 0) + self._message_history = ModelMessagesTypeAdapter.validate_python( + state.state.get("message_history", []) + ) + + async def _persist_state(self, task_id: str) -> None: + """Persist turn number + message history to adk.state for recycle survival. + + Only does anything when continue-as-new is enabled AND an AGENT_ID is + present; otherwise it returns immediately and schedules no activity, so + the default-off path is unchanged. The pydantic-ai ``ModelMessage`` list + is serialized to a JSON-safe object via ``ModelMessagesTypeAdapter`` so it + round-trips losslessly. Creates the state row on first call, updates it + thereafter. + """ + if not self._continue_as_new_enabled or not environment_variables.AGENT_ID: + return + payload = { + "turn_number": self._turn_number, + "message_history": ModelMessagesTypeAdapter.dump_python( + self._message_history, mode="json" + ), + } + if self._state_id is None: + state = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=payload, + ) + self._state_id = state.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: @@ -111,11 +185,21 @@ async def on_task_event_send(self, params: SendEventParams) -> None: if span: span.output = {"final_output": result.output} + # Persist the updated conversation OUTSIDE workflow state so it survives a + # continue-as-new recycle. No-op (zero activity calls) unless + # continue-as-new is enabled and an AGENT_ID is present. + await self._persist_state(params.task.id) + @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: """Workflow entry point — keep the conversation alive for incoming signals.""" logger.info(f"Task created: {params.task.id}") + # On a run resumed via continue-as-new the in-memory conversation was + # reset; reload it from adk.state. No-op on a brand-new task or when + # continue-as-new is disabled (the default). + await self._rehydrate_state(params.task.id) + await adk.messages.create( task_id=params.task.id, content=TextContent( @@ -127,7 +211,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + # Keep the workflow open to field events. Identical to the old indefinite + # wait unless WORKFLOW_CONTINUE_AS_NEW_ENABLED is set, in which case the + # event history is recycled via continue-as-new before it hits Temporal's + # ~50k-event / 50MB limit. The conversation is persisted to adk.state and + # re-hydrated on each run, so the carry-forward is just `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py b/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py index 5cb8fb38b..9275a583f 100644 --- a/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py @@ -102,6 +102,15 @@ async def on_task_create(self, params: CreateTaskParams) -> str: """Workflow entry point — keep the conversation alive for incoming signals.""" logger.info(f"Task created: {params.task.id}") + # Rebuild the running conversation from the adk.messages ledger (the source + # of truth), then derive the turn number. On a brand-new task the ledger is + # empty so this is a no-op fresh start; on a continued run (after a + # continue-as-new recycle) it reconstructs self._messages so the recycle is + # invisible to the user. conversation_from_messages returns [] when + # continue-as-new is disabled, so the default path is unchanged. + self._messages = await self.conversation_from_messages(params.task.id) + self._turn_number = sum(1 for m in self._messages if m["role"] == "user") + await adk.messages.create( task_id=params.task.id, content=TextContent( @@ -114,7 +123,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + # run_until_complete behaves exactly like the old indefinite wait_condition + # by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it recycles event + # history via continue-as-new before Temporal's history limit. The + # conversation is rebuilt from adk.messages on each run (above), so the only + # thing carried forward across a recycle is `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py index b9224ca00..9a5d24cc6 100644 --- a/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py @@ -65,6 +65,17 @@ async def complete_task_signal(self) -> None: @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: + # Rebuild the conversation from the adk.messages ledger. This is a no-op + # fresh start by default (returns [] when continue-as-new is disabled); + # on a continue-as-new recycle it reloads the prior turns so the recycle + # is invisible. These rebuilt messages are already in the ledger, so set + # the high-water mark to their count to avoid re-emitting them. + # NOTE: rebuilt messages are plain {"role", "content"} text-only dicts; + # LangGraph accepts these as input, but any tool-call structure from + # prior turns is not reconstructed — acceptable at a turn boundary. + self._messages = await self.conversation_from_messages(params.task.id) + self._emitted = len(self._messages) + await adk.messages.create( task_id=params.task.id, content=TextContent( @@ -76,5 +87,5 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ), ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py b/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py index 7f50ba8d5..aba8a5bff 100644 --- a/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py @@ -71,6 +71,54 @@ def __init__(self): self._turn_number = 0 # Claude Code session_id for multi-turn resume. self._session_id: str | None = None + # adk.state row id for the opaque session handle (continue-as-new only). + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Restore the Claude Code session handle on a continue-as-new recycle. + + ``session_id`` is an opaque CLI handle (``-r ``) that maintains + conversation context — it CANNOT be rebuilt from adk.messages, so to survive + a continue-as-new recycle it is persisted to adk.state (the sanctioned home + for non-message state). When continue-as-new is disabled (the default) this + makes ZERO extra activity calls and leaves the normal fresh init in place. + """ + if self._continue_as_new_enabled and environment_variables.AGENT_ID: + existing = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if existing is not None: + self._state_id = existing.id + self._session_id = existing.state.get("session_id") + self._turn_number = existing.state.get("turn_number", 0) + + async def _persist_state(self, task_id: str) -> None: + """Persist the opaque Claude Code session handle + turn number to adk.state. + + Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra + activity calls. Creates the row on first write, updates it thereafter. + """ + if not (self._continue_as_new_enabled and environment_variables.AGENT_ID): + return + state_payload = { + "session_id": self._session_id, + "turn_number": self._turn_number, + } + if self._state_id is None: + created = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) + self._state_id = created.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: @@ -113,10 +161,18 @@ async def on_task_event_send(self, params: SendEventParams) -> None: if span: span.output = {"final_text": result.get("final_text")} + # Persist the opaque session handle so it survives a continue-as-new + # recycle (no-op when the feature is disabled). + await self._persist_state(task_id) + @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: logger.info("Task created: %s", params.task.id) + # On a continue-as-new recycle, restore the opaque session handle from + # adk.state. Default path (feature off) is a no-op fresh start. + await self._rehydrate_state(params.task.id) + await adk.messages.create( task_id=params.task.id, content=TextContent( @@ -128,7 +184,7 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py b/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py index 1970b478f..778d324f1 100644 --- a/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py @@ -14,8 +14,9 @@ ``StreamTaskMessage*`` events to Redis so the UI sees tokens in real time. - Passing ``created_at=workflow.now()`` for deterministic timestamps under Temporal replay (required for Temporal-safe delivery). -- Persisting the codex thread ID on the workflow instance itself — Temporal's - workflow state is durable, so no external ``adk.state`` round-trip is needed. +- Persisting the codex thread ID on the workflow instance for in-run durability; + when continue-as-new is enabled it is also flushed to ``adk.state`` so the + thread survives a history recycle (gated off by default). """ from __future__ import annotations @@ -63,8 +64,9 @@ class AtHarnessCodexWorkflow(BaseWorkflow): """Long-running Temporal workflow that runs codex exec for each turn. Conversation state (codex thread ID + turn counter) is kept on the - workflow instance. Temporal's durable replay reconstructs this state if - the worker crashes, so no external ``adk.state`` round-trip is needed. + workflow instance, which Temporal's durable replay reconstructs if the + worker crashes. When continue-as-new is enabled it is additionally flushed + to ``adk.state`` so the thread survives a history recycle (off by default). """ def __init__(self): @@ -72,6 +74,54 @@ def __init__(self): self._complete_task = False self._turn_number = 0 self._codex_thread_id: str | None = None + # adk.state row id for the opaque session handle (continue-as-new only). + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Restore the opaque codex thread id on a continue-as-new recycle. + + The codex thread id is an opaque handle that maintains conversation context + across turns — it CANNOT be rebuilt from adk.messages, so to survive a + continue-as-new recycle it is persisted to adk.state (the sanctioned home + for non-message state). When continue-as-new is disabled (the default) this + makes ZERO extra activity calls and leaves the normal fresh init in place. + """ + if self._continue_as_new_enabled and environment_variables.AGENT_ID: + existing = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if existing is not None: + self._state_id = existing.id + self._codex_thread_id = existing.state.get("session_id") + self._turn_number = existing.state.get("turn_number", 0) + + async def _persist_state(self, task_id: str) -> None: + """Persist the opaque codex thread id + turn number to adk.state. + + Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra + activity calls. Creates the row on first write, updates it thereafter. + """ + if not (self._continue_as_new_enabled and environment_variables.AGENT_ID): + return + state_payload = { + "session_id": self._codex_thread_id, + "turn_number": self._turn_number, + } + if self._state_id is None: + created = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) + self._state_id = created.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: @@ -118,11 +168,19 @@ async def on_task_event_send(self, params: SendEventParams) -> None: "model": result.get("model"), } + # Persist the opaque session handle so it survives a continue-as-new + # recycle (no-op when the feature is disabled). + await self._persist_state(params.task.id) + @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: """Workflow entry point — keep the conversation alive for incoming signals.""" logger.info("Task created: %s", params.task.id) + # On a continue-as-new recycle, restore the opaque codex thread id from + # adk.state. Default path (feature off) is a no-op fresh start. + await self._rehydrate_state(params.task.id) + await adk.messages.create( task_id=params.task.id, content=TextContent( @@ -135,7 +193,7 @@ async def on_task_create(self, params: CreateTaskParams) -> str: ), ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py index fa6c66083..86dd73594 100644 --- a/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py @@ -151,6 +151,11 @@ async def _fake_execute_activity(_activity, params, **_kw): wf._codex_thread_id = None wf._complete_task = False wf._display_name = "test" + # __new__ bypasses BaseWorkflow.__init__, so set the continue-as-new + # config attributes it would have populated (feature off for this test). + wf._continue_as_new_enabled = False + wf._max_history_length = None + wf._state_id = None params = MagicMock() params.task.id = "task-temporal-offline-1" diff --git a/src/agentex/lib/core/temporal/workflows/workflow.py b/src/agentex/lib/core/temporal/workflows/workflow.py index 3e4498162..6502d7331 100644 --- a/src/agentex/lib/core/temporal/workflows/workflow.py +++ b/src/agentex/lib/core/temporal/workflows/workflow.py @@ -1,13 +1,26 @@ +from __future__ import annotations + from abc import ABC, abstractmethod +from typing import Any, Callable from temporalio import workflow from agentex.protocol.acp import SendEventParams, CreateTaskParams from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.types.workflow import SignalName logger = make_logger(__name__) +# Patch identifier gating the continue-as-new recycle path. Workflows that were +# already running before this code shipped have no record of this patch in their +# event history; `workflow.patched()` lets the new drain + continue_as_new branch +# be introduced without breaking determinism when those in-flight executions +# replay against the new code. Gate the decision ONCE in the agent's @workflow.run +# (see the 010_agent_chat reference agent) so pre-patch runs keep their old +# behaviour and never spin. +CONTINUE_AS_NEW_PATCH_ID = "agentex-base-workflow-continue-as-new-v1" + class BaseWorkflow(ABC): def __init__( @@ -15,6 +28,13 @@ def __init__( display_name: str, ): self.display_name = display_name + # Read once at construction. `refresh()` returns the process-wide cached + # config the worker loaded at import time, so this does not read os.environ + # during workflow execution (which the sandbox would flag) — the values are + # deployment constants and therefore replay-safe. + env = EnvironmentVariables.refresh() + self._continue_as_new_enabled: bool = env.WORKFLOW_CONTINUE_AS_NEW_ENABLED + self._max_history_length: int | None = env.WORKFLOW_MAX_HISTORY_LENGTH @abstractmethod @workflow.signal(name=SignalName.RECEIVE_EVENT) @@ -24,3 +44,146 @@ async def on_task_event_send(self, params: SendEventParams) -> None: @abstractmethod async def on_task_create(self, params: CreateTaskParams) -> None: raise NotImplementedError + + # ------------------------------------------------------------------ # + # Continue-as-new lifecycle helpers # + # # + # These let a long-lived chat/session workflow recycle its event # + # history so it can stay open indefinitely without hitting Temporal's # + # ~50k-event / 50MB history limit. The SDK owns the genuinely hard, # + # easy-to-get-wrong Temporal mechanics here (threshold decision, # + # draining in-flight handlers, the continue_as_new call); the agent # + # owns its data and is responsible for keeping it OUTSIDE workflow # + # state so it survives the recycle: messages in `adk.messages`, any # + # other state in `adk.state`. See the 010_agent_chat reference agent # + # for the full recipe. # + # ------------------------------------------------------------------ # + + def should_continue_as_new(self) -> bool: + """Whether this run should recycle its event history via continue-as-new. + + Returns True only when continue-as-new is enabled for the agent AND + either: + - Temporal suggests it (history is approaching the server's limit), or + - the configured ``WORKFLOW_MAX_HISTORY_LENGTH`` threshold is reached. + + This reads only deterministic ``workflow.info()`` values and constant + config and emits no commands, so it is safe to use directly as a + ``workflow.wait_condition`` predicate, e.g.:: + + await workflow.wait_condition( + lambda: self._complete_task or self.should_continue_as_new() + ) + """ + if not self._continue_as_new_enabled: + return False + info = workflow.info() + if info.is_continue_as_new_suggested(): + return True + if ( + self._max_history_length is not None + and info.get_current_history_length() >= self._max_history_length + ): + return True + return False + + async def drain_and_continue_as_new(self, *args: Any) -> None: + """Drain in-flight signal handlers, then continue-as-new. + + Call this from the agent's ``@workflow.run`` once the run loop wakes for a + recycle (see :meth:`should_continue_as_new`). ``args`` are forwarded + verbatim to ``workflow.continue_as_new`` and become the new run's input, so + pass whatever your ``@workflow.run`` signature expects — typically the + original ``CreateTaskParams`` (the new run keeps the same workflow id / task + id and re-hydrates its state from ``adk.state``). + + IMPORTANT: keep your data OUTSIDE workflow state BEFORE calling this — + messages in ``adk.messages`` and any other state in ``adk.state``. + In-workflow attributes do NOT survive the recycle; only the forwarded + ``args`` do. + + Waits on ``all_handlers_finished`` first so an in-flight turn (a signal + handler still running an activity) is never lost or duplicated across the + recycle boundary. ``workflow.continue_as_new`` raises to end the run, so + this never returns normally. + """ + # Don't recycle until any signal handler still running has finished, so a + # message mid-flight at the boundary is carried into the next run intact. + await workflow.wait_condition(workflow.all_handlers_finished) + logger.info( + "Recycling workflow via continue-as-new " + f"(history_length={workflow.info().get_current_history_length()}, " + f"run_id={workflow.info().run_id})" + ) + workflow.continue_as_new(*args) + + async def run_until_complete( + self, + *continue_as_new_args: Any, + is_complete: Callable[[], bool], + ) -> None: + """Keep the workflow open to field events, recycling history as needed. + + Drop-in replacement for the usual ``await workflow.wait_condition( + lambda: self._complete_task, timeout=None)`` at the end of an agent's + ``@workflow.run``. ``is_complete`` is a no-arg predicate (typically + ``lambda: self._complete_task``); ``continue_as_new_args`` are forwarded to + continue-as-new on recycle (typically the original ``CreateTaskParams``). + + Behaviour is identical to the old indefinite wait UNLESS + ``WORKFLOW_CONTINUE_AS_NEW_ENABLED`` is set, so adopting this is safe by + default. The recycle path is gated once behind ``workflow.patched(...)`` so + workflows that started before this code shipped keep waiting the old way + and never hit a non-determinism error on replay. + + Persist anything you need across a recycle OUTSIDE workflow state first — + messages in ``adk.messages``, other state in ``adk.state`` — and rebuild it + at the top of ``@workflow.run`` (see :meth:`conversation_from_messages`). + """ + if not self._continue_as_new_enabled or not workflow.patched( + CONTINUE_AS_NEW_PATCH_ID + ): + await workflow.wait_condition(is_complete, timeout=None) + return + while True: + await workflow.wait_condition( + lambda: is_complete() or self.should_continue_as_new() + ) + if is_complete(): + return + # Drains in-flight handlers, then continue-as-new (raises; never returns). + await self.drain_and_continue_as_new(*continue_as_new_args) + + async def conversation_from_messages( + self, + task_id: str, + ) -> list[dict[str, str]]: + """Rebuild a role/content conversation list from the adk.messages ledger. + + Messages are the source of truth and live in adk.messages, not in workflow + state — so after a continue-as-new recycle the conversation is reconstructed + from the ledger rather than carried through workflow history. Returns an + OpenAI-style ``[{"role": "user"|"assistant", "content": str}, ...]`` list. + + Returns an empty list when continue-as-new is disabled (the default) or the + task has no messages yet, so callers can use it unconditionally to seed + their state. + """ + if not self._continue_as_new_enabled: + return [] + # Lazy import to avoid any import cycle at module load. + from agentex.lib import adk + from agentex.types.text_content import TextContent + + messages = await adk.messages.list(task_id=task_id) + conversation: list[dict[str, str]] = [] + for message in messages: + content = message.content + # Only text turns reconstruct the conversation; the isinstance check + # narrows the content union (DataContent / ToolRequestContent etc. + # have no `.content`/`.author`). + if not isinstance(content, TextContent): + continue + role = "assistant" if content.author == "agent" else "user" + conversation.append({"role": role, "content": content.content}) + return conversation diff --git a/src/agentex/lib/environment_variables.py b/src/agentex/lib/environment_variables.py index 31ce43ab8..e2db6cad0 100644 --- a/src/agentex/lib/environment_variables.py +++ b/src/agentex/lib/environment_variables.py @@ -34,6 +34,8 @@ class EnvVarKeys(str, Enum): WORKFLOW_NAME = "WORKFLOW_NAME" WORKFLOW_TASK_QUEUE = "WORKFLOW_TASK_QUEUE" WORKFLOW_EXECUTION_TIMEOUT_SECONDS = "WORKFLOW_EXECUTION_TIMEOUT_SECONDS" + WORKFLOW_CONTINUE_AS_NEW_ENABLED = "WORKFLOW_CONTINUE_AS_NEW_ENABLED" + WORKFLOW_MAX_HISTORY_LENGTH = "WORKFLOW_MAX_HISTORY_LENGTH" # Temporal Worker Configuration HEALTH_CHECK_PORT = "HEALTH_CHECK_PORT" # Auth Configuration @@ -81,6 +83,17 @@ class EnvironmentVariables(BaseModel): # agents with longer-running tasks should override this. Must be > 0 — a # zero or negative timedelta would cause every submitted workflow to fail. WORKFLOW_EXECUTION_TIMEOUT_SECONDS: int = Field(default=86400, gt=0) + # Opt-in: when enabled, a BaseWorkflow-derived agent recycles its Temporal + # event history via continue-as-new before hitting the server's history limit, + # so a single chat/session can stay open indefinitely. Off by default so + # existing agents are unaffected. continue-as-new bounds HISTORY SIZE; it does + # NOT extend WORKFLOW_EXECUTION_TIMEOUT_SECONDS (that timeout is chain-wide and + # bounds wall-clock lifetime) — raise that knob too to keep workflows long-lived. + WORKFLOW_CONTINUE_AS_NEW_ENABLED: bool = False + # Optional history-length threshold (event count) at which to continue-as-new. + # When unset, recycling relies solely on Temporal's own is_continue_as_new_suggested() + # signal. Set a value below ~50k to recycle earlier with headroom. + WORKFLOW_MAX_HISTORY_LENGTH: int | None = Field(default=None, gt=0) # Temporal Worker Configuration HEALTH_CHECK_PORT: int = 80 # Auth Configuration diff --git a/tests/lib/core/temporal/test_base_workflow_continue_as_new.py b/tests/lib/core/temporal/test_base_workflow_continue_as_new.py new file mode 100644 index 000000000..5d9999692 --- /dev/null +++ b/tests/lib/core/temporal/test_base_workflow_continue_as_new.py @@ -0,0 +1,94 @@ +"""Unit tests for BaseWorkflow's continue-as-new decision logic. + +These exercise ``should_continue_as_new`` in isolation by faking +``workflow.info()`` so we don't need a running Temporal server. The drain + +``workflow.continue_as_new`` mechanics in ``drain_and_continue_as_new`` are best +covered by a replay/integration test against a Temporal test environment (a +follow-up); here we lock down the threshold logic that decides *when* to recycle. +""" + +from __future__ import annotations + +from typing import override + +import pytest + +from agentex.lib.core.temporal.workflows import workflow as base_workflow_module +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow + + +class _ConcreteWorkflow(BaseWorkflow): + """Minimal concrete subclass so we can instantiate the ABC in a test. + + Bypasses BaseWorkflow.__init__ (which reads cached env config) and sets the + two config attributes directly so each test controls them explicitly. + """ + + def __init__(self, *, enabled: bool, max_history_length: int | None) -> None: + self.display_name = "test" + self._continue_as_new_enabled = enabled + self._max_history_length = max_history_length + + @override + async def on_task_event_send(self, params) -> None: # pragma: no cover - unused + raise NotImplementedError + + @override + async def on_task_create(self, params) -> None: # pragma: no cover - unused + raise NotImplementedError + + +class _FakeInfo: + def __init__(self, *, suggested: bool, history_length: int) -> None: + self._suggested = suggested + self._history_length = history_length + + def is_continue_as_new_suggested(self) -> bool: + return self._suggested + + def get_current_history_length(self) -> int: + return self._history_length + + +@pytest.fixture +def patch_info(monkeypatch): + """Patch ``workflow.info`` used inside the BaseWorkflow module.""" + + def _apply(*, suggested: bool, history_length: int) -> None: + monkeypatch.setattr( + base_workflow_module.workflow, + "info", + lambda: _FakeInfo(suggested=suggested, history_length=history_length), + ) + + return _apply + + +def test_disabled_never_recycles(patch_info): + patch_info(suggested=True, history_length=10_000_000) + wf = _ConcreteWorkflow(enabled=False, max_history_length=1) + assert wf.should_continue_as_new() is False + + +def test_recycles_when_temporal_suggests(patch_info): + patch_info(suggested=True, history_length=5) + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert wf.should_continue_as_new() is True + + +def test_recycles_at_history_threshold(patch_info): + patch_info(suggested=False, history_length=10_000) + wf = _ConcreteWorkflow(enabled=True, max_history_length=10_000) + assert wf.should_continue_as_new() is True + + +def test_does_not_recycle_below_threshold(patch_info): + patch_info(suggested=False, history_length=9_999) + wf = _ConcreteWorkflow(enabled=True, max_history_length=10_000) + assert wf.should_continue_as_new() is False + + +def test_no_threshold_relies_only_on_suggestion(patch_info): + patch_info(suggested=False, history_length=10_000_000) + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert wf.should_continue_as_new() is False diff --git a/uv.lock b/uv.lock index 8a41ba29c..620e8a64e 100644 --- a/uv.lock +++ b/uv.lock @@ -15,7 +15,7 @@ members = [ [[package]] name = "agentex-client" -version = "0.13.0" +version = "0.15.0" source = { editable = "." } dependencies = [ { name = "anyio" }, @@ -91,7 +91,7 @@ dev = [ [[package]] name = "agentex-sdk" -version = "0.13.0" +version = "0.14.0" source = { editable = "adk" } dependencies = [ { name = "agentex-client" },