From 0b72587ff69ba62ddeca80ab2694df392b0fa3f5 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Wed, 24 Jun 2026 15:51:57 -0400 Subject: [PATCH 1/7] fix(openai-agents): harden sync converter + align hosted-tool response shape Addresses the three P1 Greptile findings on the new sync OpenAI converter (convert_openai_to_agentex_events / OpenAITurn) plus the hosted-tool content mismatch flagged in PR #443: - Parse raw tool-call arguments through a defensive _safe_parse_arguments helper so malformed/truncated/non-dict JSON no longer raises and aborts the turn (matches the Temporal streaming model fallback). - Emit StreamTaskMessageDone for completed reasoning content/summary items so UnifiedEmitter.auto_send releases the context and the reasoning span is closed (reasoning messages used to hang open). - Reserve a fresh message index for every new text item_id so a final answer cannot collide with the preceding reasoning message on reasoning-model streams. - Emit hosted/server-side tool responses as a plain string in TemporalStreamingModel, matching the function-tool response path. Adds regression tests for the converter helpers and reasoning/text sequencing. Co-Authored-By: Claude Opus 4.8 (1M context) --- adk/docs/harness.md | 6 + adk/docs/migration-0.16.0.md | 202 ++++++++++++++++++ src/agentex/lib/adk/_modules/_openai_sync.py | 83 ++++--- .../models/temporal_streaming_model.py | 5 +- tests/lib/adk/test_openai_sync.py | 167 +++++++++++++++ 5 files changed, 420 insertions(+), 43 deletions(-) create mode 100644 adk/docs/migration-0.16.0.md create mode 100644 tests/lib/adk/test_openai_sync.py diff --git a/adk/docs/harness.md b/adk/docs/harness.md index d81835a03..62094d469 100644 --- a/adk/docs/harness.md +++ b/adk/docs/harness.md @@ -198,3 +198,9 @@ result = await emitter.auto_send_turn(turn, created_at=workflow.now()) # result.final_text — last text segment # result.usage — TurnUsage (tokens, cost, ...) ``` + +--- + +## Migration + +- [Migrating to `agentex-client` 0.16.0 / `agentex-sdk` 0.15.0](./migration-0.16.0.md) — removed LangGraph/Pydantic-AI tracing handlers (tracing is now derived from the canonical stream), private `_modules` path moves, the OpenAI harness facade relocation, and the new `run_turn` Temporal entry point. diff --git a/adk/docs/migration-0.16.0.md b/adk/docs/migration-0.16.0.md new file mode 100644 index 000000000..6f808a0f1 --- /dev/null +++ b/adk/docs/migration-0.16.0.md @@ -0,0 +1,202 @@ +# Migration Guide — `agentex-client` 0.16.0 / `agentex-sdk` 0.15.0 + +This release consolidates the LangGraph, Pydantic-AI, and OpenAI Agents harnesses +onto the **unified harness surface** (`UnifiedEmitter` + `SpanDeriver`), introduces +`run_turn` as the single Temporal entry point for OpenAI Agents, renders +hosted/server-side tool calls in the Temporal streaming model, and ships new CLI +init templates. + +Most consumers only need to act on **section 1** (removed tracing handlers). +Sections 2–3 only matter if you import private modules. Section 4 lists the new, +opt-in capabilities. Section 5 documents the defect fixes shipped on top of the +release. + +--- + +## 1. Tracing handlers removed (LangGraph + Pydantic-AI) — **action required** + +The bespoke tracing callback handlers are **gone** from the public +`agentex.lib.adk` surface: + +| Removed | | +|---|---| +| `agentex.lib.adk.create_langgraph_tracing_handler` | + class `AgentexLangGraphTracingHandler` | +| `agentex.lib.adk.create_pydantic_ai_tracing_handler` | + class `AgentexPydanticAITracingHandler` | + +Span tracing is now **derived automatically** from the canonical +`StreamTaskMessage*` stream by `UnifiedEmitter`. You no longer construct or pass a +callback handler — you wrap the run in the harness `*Turn` and drive delivery +through the emitter, and spans fall out of the stream. + +### LangGraph + +**Before** + +```python +from agentex.lib import adk + +handler = adk.create_langgraph_tracing_handler( + trace_id=trace_id, + parent_span_id=parent_span_id, +) +result = await graph.ainvoke(state, config={"callbacks": [handler]}) +``` + +**After** + +```python +from agentex.lib.adk import stream_langgraph_events # facade name unchanged + +# Streaming delivery + tracing are handled for you; no callbacks wiring. +async for event in stream_langgraph_events(graph, state, ...): + ... +``` + +or, when you own the emitter directly: + +```python +from agentex.lib.adk import LangGraphTurn +from agentex.lib.core.harness import UnifiedEmitter + +emitter = UnifiedEmitter(...) +await emitter.auto_send_turn(LangGraphTurn(...)) # or: emitter.yield_turn(...) +``` + +### Pydantic-AI + +**Before** + +```python +handler = adk.create_pydantic_ai_tracing_handler(trace_id=..., parent_span_id=...) +``` + +**After** + +```python +from agentex.lib.adk import PydanticAITurn, stream_pydantic_ai_events +from agentex.lib.core.harness import UnifiedEmitter + +# Wrap in PydanticAITurn and drive UnifiedEmitter.yield_turn / auto_send_turn. +await UnifiedEmitter(...).auto_send_turn(PydanticAITurn(...)) +``` + +The `agentex init` templates were migrated to this pattern. If you scaffolded +from an older template, regenerate (or diff against a fresh template) for the +canonical shape. + +--- + +## 2. Private `_modules` import paths changed — **only if you import privates** + +Each harness now exposes exactly `__sync.py` + `__turn.py` under +`agentex.lib.adk._modules`. Several private modules were deleted and their +functions relocated. If you imported the **public facade names** from +`agentex.lib.adk`, **nothing changes**. Repoint only if you reached into the +private modules directly: + +| Old (deleted) private import | New location | Public facade (unchanged) | +|---|---|---| +| `_modules._langgraph_async.stream_langgraph_events` | `_modules._langgraph_turn` | `adk.stream_langgraph_events` | +| `_modules._langgraph_messages.emit_langgraph_messages` | `_modules._langgraph_sync` | `adk.emit_langgraph_messages` | +| `_modules._langgraph_tracing.*` | **removed** (see §1) | — | +| `_modules._pydantic_ai_async.stream_pydantic_ai_events` | `_modules._pydantic_ai_turn` | `adk.stream_pydantic_ai_events` | +| `_modules._pydantic_ai_tracing.*` | **removed** (see §1) | — | + +✅ These facade names are unchanged and keep working: +`stream_langgraph_events`, `emit_langgraph_messages`, +`convert_langgraph_to_agentex_events`, `LangGraphTurn`, +`stream_pydantic_ai_events`, `convert_pydantic_ai_to_agentex_events`, +`PydanticAITurn`. + +--- + +## 3. OpenAI harness moved into `adk/_modules` + facade export + +The OpenAI Agents harness now lives alongside the others: + +- `OpenAITurn`, `openai_usage_to_turn_usage` → `agentex.lib.adk._modules._openai_turn` +- `convert_openai_to_agentex_events` → `agentex.lib.adk._modules._openai_sync` + +New **public** facade exports (prefer these): + +```python +from agentex.lib.adk import ( + OpenAITurn, + convert_openai_to_agentex_events, + openai_usage_to_turn_usage, +) +``` + +Back-compat shims remain at +`agentex.lib.adk.providers._modules.{openai_turn,sync_provider}` **for one +release** — migrate to the facade names before the next minor. + +--- + +## 4. New capabilities (opt-in, no migration required) + +- **`run_turn` — unified Temporal entry point for OpenAI Agents.** + + ```python + from agentex.lib.core.temporal.plugins.openai_agents import run_turn, OpenAIAgentsTurnResult + + result = await run_turn( + agent, input, + task_id=task_id, + trace_id=trace_id, + parent_span_id=parent_span_id, + ) + result.final_output # raw SDK final_output + result.usage # normalized TurnUsage for the turn span + ``` + + It emits each tool call exactly once (the streaming model is the sole + tool-**request** emitter; hooks emit tool **responses**), traces per-tool spans, + normalizes token usage, and drains orphaned tool spans in a `finally` block if + the run terminates mid-tool. Existing `TemporalStreamingHooks` callers keep + working — `run_turn` is additive. If you pass your own `hooks` subclass, also + set `emit_tool_requests=False` and forward `trace_id` / `parent_span_id` + yourself (they are only auto-applied to the default hooks). + +- **Hosted / server-side tool rendering** in `TemporalStreamingModel`: + web_search, file_search, code_interpreter, image_generation, server-side mcp, + computer, and local_shell calls now surface as ToolRequest/ToolResponse pairs. + +- **New CLI init templates:** `default` / `sync` / `temporal` flavors of + `claude-code` and `codex`, plus `default-openai-agents`. + +--- + +## 5. Defect fixes shipped with this migration + +These fixes harden the newly-added sync OpenAI converter +(`convert_openai_to_agentex_events` / `OpenAITurn`) and the Temporal hosted-tool +path. No API change — behavior only. + +1. **Malformed tool arguments no longer abort the turn.** The converter now + parses raw tool-call arguments through a defensive helper + (`_safe_parse_arguments`): a non-decodable string is preserved under `raw` + and a non-dict JSON value under `value`, instead of raising `JSONDecodeError` + and killing the run before later output is delivered. This matches the + Temporal streaming model's existing fallback. + +2. **Reasoning messages are closed.** Completed reasoning content/summary items + now emit a matching `StreamTaskMessageDone`. Previously the `Done` was + skipped, so `UnifiedEmitter.auto_send` never released the context and the + reasoning span could be marked incomplete (reasoning-model output appeared to + hang). + +3. **Text no longer collides with reasoning.** Every new text `item_id` now + reserves a fresh message index (matching the increment-then-use convention of + the reasoning/tool paths). Previously the first text item reused the current + index, so on reasoning-model streams the final answer could overwrite the + reasoning message, duplicate a `Start`, or route deltas into the wrong context. + +4. **Hosted-tool response shape aligned.** Hosted/server-side tool responses in + `TemporalStreamingModel` now emit `content` as a plain string, matching the + function-tool response path (`on_tool_end`) so hosted and function tools + render identically within the same flow. + +> Action: if you adopted `OpenAITurn` for **reasoning models** (o1/o3/gpt-5) on +> the sync path before these fixes, upgrade — fixes 2 and 3 are required for +> correct reasoning rendering. diff --git a/src/agentex/lib/adk/_modules/_openai_sync.py b/src/agentex/lib/adk/_modules/_openai_sync.py index 75d8f8f2a..1d5194c76 100644 --- a/src/agentex/lib/adk/_modules/_openai_sync.py +++ b/src/agentex/lib/adk/_modules/_openai_sync.py @@ -12,6 +12,7 @@ from __future__ import annotations +import json from typing import Any from openai.types.responses import ( @@ -43,6 +44,27 @@ from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta +def _safe_parse_arguments(arguments: Any) -> dict[str, Any]: + """Coerce a tool call's ``arguments`` into a dict, tolerating bad JSON. + + Mirrors the Temporal streaming model: malformed, truncated, or + provider-specific raw arguments must not abort the whole turn, so a + non-decodable string is preserved under ``raw`` instead of raising and a + non-dict JSON value is wrapped under ``value``. + """ + if not arguments: + return {} + if isinstance(arguments, dict): + return arguments + if isinstance(arguments, str): + try: + parsed = json.loads(arguments) + except (json.JSONDecodeError, ValueError): + return {"raw": arguments} + return parsed if isinstance(parsed, dict) else {"value": parsed} + return arguments + + def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, Any]]: """ Extract call_id, tool_name, and tool_arguments from a tool call item. @@ -69,30 +91,12 @@ def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, An elif isinstance(tool_call_item, ResponseFunctionToolCall): # Handle standard function tool calls tool_name = tool_call_item.name - # Handle the arguments field which might be a string or None - if tool_call_item.arguments: - if isinstance(tool_call_item.arguments, str): - import json - - tool_arguments = json.loads(tool_call_item.arguments) if tool_call_item.arguments else {} - else: - tool_arguments = tool_call_item.arguments - else: - tool_arguments = {} + tool_arguments = _safe_parse_arguments(tool_call_item.arguments) else: # Generic handling for any tool call type tool_name = getattr(tool_call_item, "name", type(tool_call_item).__name__) - # Handle the arguments field which might be a string or None if hasattr(tool_call_item, "arguments"): - arguments = tool_call_item.arguments - if isinstance(arguments, str): - import json - - tool_arguments = json.loads(arguments) if arguments else {} - elif arguments is None: - tool_arguments = {} - else: - tool_arguments = arguments + tool_arguments = _safe_parse_arguments(tool_call_item.arguments) else: tool_arguments = tool_call_item.model_dump() @@ -150,7 +154,6 @@ async def convert_openai_to_agentex_events(stream_response): tool_map = {} event_count = 0 message_index = 0 # Track message index for proper sequencing - seen_tool_output = False # Track if we've seen tool output to know when final text starts item_id_to_index = {} # Map item_id to message index item_id_to_type = {} # Map item_id to content type (text, reasoning_content, reasoning_summary) @@ -172,16 +175,16 @@ async def convert_openai_to_agentex_events(stream_response): elif isinstance(raw_event, ResponseOutputItemDoneEvent): item_id = raw_event.item.id if item_id in item_id_to_index: - # Get the message type to decide whether to send done event - message_type = item_id_to_type.get(item_id, "text") - - # Don't send done events for reasoning content/summary - # They just end with their last delta - if message_type not in ("reasoning_content", "reasoning_summary"): - yield StreamTaskMessageDone( - type="done", - index=item_id_to_index[item_id], - ) + # Close every streamed message — text AND reasoning — with a + # matching Done. UnifiedEmitter.auto_send only releases a + # context on StreamTaskMessageDone; skipping it for reasoning + # left those messages hanging and their spans incomplete. The + # accumulator rebuilds ReasoningContent from the deltas, so the + # Done carries no payload. + yield StreamTaskMessageDone( + type="done", + index=item_id_to_index[item_id], + ) # Skip reasoning summary part added events - we handle them on delta elif isinstance(raw_event, ResponseReasoningSummaryPartAddedEvent): @@ -292,17 +295,14 @@ async def convert_openai_to_agentex_events(stream_response): # Check if this event has an item_id item_id = getattr(raw_event, "item_id", None) - # If this is a new item_id we haven't seen, it's a new message + # If this is a new item_id we haven't seen, it's a new message. + # Reserve a fresh index for every text item_id (matching the + # increment-then-use convention of the reasoning/tool paths). + # Reusing the current index let a final answer collide with the + # preceding reasoning message on reasoning-model streams. if item_id and item_id not in item_id_to_index: - # Check if this is truly a NEW text message after tools - # We need to differentiate between the first text and the final text after tools - if seen_tool_output: - # This is the final text message after tool execution - message_index += 1 - item_id_to_index[item_id] = message_index - else: - item_id_to_index[item_id] = message_index - + message_index += 1 + item_id_to_index[item_id] = message_index item_id_to_type[item_id] = "text" # Send a start event with empty content for this new text message @@ -363,7 +363,6 @@ async def convert_openai_to_agentex_events(stream_response): author="agent", ) message_index += 1 # Increment for new message - seen_tool_output = True # Mark that we've seen tool output so next text gets new index yield StreamTaskMessageFull( type="full", index=message_index, diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 38e3503d7..7c8690f21 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -1087,7 +1087,10 @@ async def get_response( author="agent", tool_call_id=call_id, name=name, - content={"result": _hosted_tool_result(item)[:_HOSTED_TOOL_RESULT_CAP]}, + # Plain string, matching the function-tool response + # path (hooks.on_tool_end) so hosted and function + # tools render identically in the same flow. + content=_hosted_tool_result(item)[:_HOSTED_TOOL_RESULT_CAP], ), ) diff --git a/tests/lib/adk/test_openai_sync.py b/tests/lib/adk/test_openai_sync.py new file mode 100644 index 000000000..4aa32426d --- /dev/null +++ b/tests/lib/adk/test_openai_sync.py @@ -0,0 +1,167 @@ +"""Tests for ``convert_openai_to_agentex_events`` and its helpers. + +Focused on three previously-broken behaviors on the sync OpenAI converter: + +- ``_safe_parse_arguments`` never raises on malformed/non-dict JSON (a bad + tool-args string must not abort the whole turn). +- Every streamed item — text AND reasoning — is closed with a matching + ``StreamTaskMessageDone`` (reasoning messages used to hang open). +- Each new text ``item_id`` gets a fresh index, so a final answer cannot + collide with the preceding reasoning message on reasoning-model streams. +""" + +import types as _types + +import pytest +from openai.types.responses import ResponseTextDeltaEvent, ResponseOutputItemDoneEvent +from openai.types.responses.response_output_message import ResponseOutputMessage +from openai.types.responses.response_reasoning_item import ResponseReasoningItem +from openai.types.responses.response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent + +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.adk._modules._openai_sync import ( + _safe_parse_arguments, + convert_openai_to_agentex_events, +) + +# --------------------------------------------------------------------------- +# _safe_parse_arguments +# --------------------------------------------------------------------------- + + +def test_safe_parse_arguments_valid_dict_json(): + assert _safe_parse_arguments('{"a": 1}') == {"a": 1} + + +def test_safe_parse_arguments_empty_and_none(): + assert _safe_parse_arguments("") == {} + assert _safe_parse_arguments(None) == {} + + +def test_safe_parse_arguments_passthrough_dict(): + d = {"already": "dict"} + assert _safe_parse_arguments(d) is d + + +def test_safe_parse_arguments_malformed_preserved_not_raised(): + # A truncated / malformed payload must be preserved, never raise — raising + # here would abort the whole turn before later output is delivered. + assert _safe_parse_arguments('{"a": ') == {"raw": '{"a": '} + + +def test_safe_parse_arguments_non_dict_json_wrapped(): + # Valid JSON that isn't an object is wrapped so the result stays a dict. + assert _safe_parse_arguments("[1, 2]") == {"value": [1, 2]} + assert _safe_parse_arguments("42") == {"value": 42} + + +# --------------------------------------------------------------------------- +# convert_openai_to_agentex_events — reasoning + text sequencing +# --------------------------------------------------------------------------- + + +def _raw(data): + return _types.SimpleNamespace(type="raw_response_event", data=data) + + +async def _stream(events): + for e in events: + yield e + + +async def _collect(events): + return [e async for e in convert_openai_to_agentex_events(_stream(events))] + + +@pytest.mark.asyncio +async def test_reasoning_item_emits_done(): + """A completed reasoning item must yield a matching Done (it used to be skipped).""" + events = [ + _raw( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + item_id="r1", + content_index=0, + delta="thinking", + output_index=0, + sequence_number=1, + ) + ), + _raw( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + item=ResponseReasoningItem(id="r1", type="reasoning", summary=[]), + output_index=0, + sequence_number=2, + ) + ), + ] + out = await _collect(events) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + assert len(starts) == 1 + # The reasoning message is now closed instead of hanging open. + assert [d.index for d in dones] == [starts[0].index] + + +@pytest.mark.asyncio +async def test_reasoning_then_text_use_distinct_indices(): + """Final answer text must not reuse the reasoning message's index.""" + events = [ + _raw( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + item_id="r1", + content_index=0, + delta="thinking", + output_index=0, + sequence_number=1, + ) + ), + _raw( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + item=ResponseReasoningItem(id="r1", type="reasoning", summary=[]), + output_index=0, + sequence_number=2, + ) + ), + _raw( + ResponseTextDeltaEvent( + type="response.output_text.delta", + item_id="t1", + content_index=0, + delta="answer", + output_index=1, + sequence_number=3, + logprobs=[], + ) + ), + _raw( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + item=ResponseOutputMessage(id="t1", type="message", role="assistant", status="completed", content=[]), + output_index=1, + sequence_number=4, + ) + ), + ] + out = await _collect(events) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + assert len(starts) == 2 + reasoning_index, text_index = starts[0].index, starts[1].index + assert reasoning_index != text_index + + # Text deltas route to the text index, not the reasoning index. + text_deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta) and e.delta.type == "text"] + assert text_deltas and all(d.index == text_index for d in text_deltas) + + # Both messages are closed on their own index. + done_indices = sorted(e.index for e in out if isinstance(e, StreamTaskMessageDone)) + assert done_indices == sorted({reasoning_index, text_index}) From 2bdb7a11404ae0268c7364f73eb534c4fae73486 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Wed, 24 Jun 2026 16:10:03 -0400 Subject: [PATCH 2/7] fix(openai-agents): always coerce tool arguments to a dict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Greptile follow-up: the _safe_parse_arguments fall-through returned a raw non-dict value (list / scalar / SDK object) when a provider tool passed arguments as something other than a JSON string. ToolRequestContent.arguments is typed Dict[str, object], so that could reject the value and abort the turn — the exact failure the helper exists to prevent. Now serialize SDK objects via model_dump when it yields a dict, otherwise wrap under `value`. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/adk/_modules/_openai_sync.py | 16 +++++++++----- tests/lib/adk/test_openai_sync.py | 22 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/agentex/lib/adk/_modules/_openai_sync.py b/src/agentex/lib/adk/_modules/_openai_sync.py index 1d5194c76..ac404bef1 100644 --- a/src/agentex/lib/adk/_modules/_openai_sync.py +++ b/src/agentex/lib/adk/_modules/_openai_sync.py @@ -47,10 +47,11 @@ def _safe_parse_arguments(arguments: Any) -> dict[str, Any]: """Coerce a tool call's ``arguments`` into a dict, tolerating bad JSON. - Mirrors the Temporal streaming model: malformed, truncated, or - provider-specific raw arguments must not abort the whole turn, so a - non-decodable string is preserved under ``raw`` instead of raising and a - non-dict JSON value is wrapped under ``value``. + ``ToolRequestContent.arguments`` is typed ``Dict[str, object]``, so the + result is ALWAYS a dict — a non-dict payload must not abort the turn. + Mirroring the Temporal streaming model: malformed/truncated strings are + preserved under ``raw``, and any other non-dict value (a list, scalar, or + SDK object) is serialized if possible, otherwise wrapped under ``value``. """ if not arguments: return {} @@ -62,7 +63,12 @@ def _safe_parse_arguments(arguments: Any) -> dict[str, Any]: except (json.JSONDecodeError, ValueError): return {"raw": arguments} return parsed if isinstance(parsed, dict) else {"value": parsed} - return arguments + # Non-string, non-dict (e.g. a provider tool passing a list / scalar / SDK + # object). Prefer the object's own dict form; fall back to wrapping it. + dumped = arguments.model_dump() if hasattr(arguments, "model_dump") else None + if isinstance(dumped, dict): + return dumped + return {"value": arguments} def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, Any]]: diff --git a/tests/lib/adk/test_openai_sync.py b/tests/lib/adk/test_openai_sync.py index 4aa32426d..de2a61db8 100644 --- a/tests/lib/adk/test_openai_sync.py +++ b/tests/lib/adk/test_openai_sync.py @@ -59,6 +59,28 @@ def test_safe_parse_arguments_non_dict_json_wrapped(): assert _safe_parse_arguments("42") == {"value": 42} +def test_safe_parse_arguments_non_string_non_dict_always_returns_dict(): + # A provider tool may pass arguments as a list / scalar / SDK object rather + # than a JSON string. The result must still be a dict so ToolRequestContent + # (arguments: Dict[str, object]) accepts it instead of raising. + assert _safe_parse_arguments([1, 2]) == {"value": [1, 2]} + assert _safe_parse_arguments(7) == {"value": 7} + + class _Args: + def model_dump(self): + return {"q": "hi"} + + assert _safe_parse_arguments(_Args()) == {"q": "hi"} + + # An SDK object whose model_dump is not a dict still degrades to a dict. + class _BadDump: + def model_dump(self): + return ["not", "a", "dict"] + + bad = _BadDump() + assert _safe_parse_arguments(bad) == {"value": bad} + + # --------------------------------------------------------------------------- # convert_openai_to_agentex_events — reasoning + text sequencing # --------------------------------------------------------------------------- From f389c712f517012885d7eeb7993bc1c86aabc82d Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Wed, 24 Jun 2026 16:15:43 -0400 Subject: [PATCH 3/7] fix(harness): record reasoning text on derived reasoning spans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SpanDeriver opened reasoning spans with input={} and closed them with output=None, so reasoning/thinking text never reached the trace — every reasoning span showed blank input and output (seen on the Claude Code harness, but the deriver is shared by all harnesses). Accumulate the reasoning text per open index from ReasoningContentDelta / ReasoningSummaryDelta (and seed from any text carried on the Start content for non-streaming harnesses), then record it as the span output on close — matching how tool spans accumulate args_buf and record their result. Empty buffers still close as output=None rather than an empty string. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../lib/core/harness/span_derivation.py | 23 +++++- .../lib/core/harness/test_span_derivation.py | 79 +++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/src/agentex/lib/core/harness/span_derivation.py b/src/agentex/lib/core/harness/span_derivation.py index cecb24bcc..c0ed6ee90 100644 --- a/src/agentex/lib/core/harness/span_derivation.py +++ b/src/agentex/lib/core/harness/span_derivation.py @@ -19,6 +19,8 @@ ) from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta @dataclass @@ -51,6 +53,9 @@ class SpanDeriver: def __init__(self) -> None: self._tool_by_index: dict[int, _ToolReqMeta] = {} self._reasoning_index_open: set[int] = set() + # accumulated reasoning text per open reasoning index, recorded as the + # span output on close (deltas carry the chain-of-thought / summary text). + self._reasoning_text: dict[int, str] = {} # insertion-ordered set of open tool_call_ids (dict keys preserve order) self._open_tool_ids: dict[str, None] = {} @@ -72,8 +77,10 @@ def flush(self) -> list[SpanSignal]: signals.append(CloseSpan(key=tcid, output=None, is_complete=False)) self._open_tool_ids.clear() for idx in sorted(self._reasoning_index_open): - signals.append(CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=False)) + text = self._reasoning_text.pop(idx, "") + signals.append(CloseSpan(key=f"reasoning:{idx}", output=text or None, is_complete=False)) self._reasoning_index_open.clear() + self._reasoning_text.clear() return signals def _on_start(self, event: StreamTaskMessageStart) -> list[SpanSignal]: @@ -90,6 +97,11 @@ def _on_start(self, event: StreamTaskMessageStart) -> list[SpanSignal]: return [] if content.type == "reasoning": self._reasoning_index_open.add(idx) + # Seed from any text already on the Start content — non-streaming + # harnesses may carry the full reasoning up front; deltas append. + summary = getattr(content, "summary", None) or [] + body = getattr(content, "content", None) or [] + self._reasoning_text[idx] = "".join([*summary, *body]) return [OpenSpan(key=f"reasoning:{idx}", kind="reasoning", name="reasoning", input={})] return [] @@ -102,6 +114,12 @@ def _on_delta(self, event: StreamTaskMessageDelta) -> list[SpanSignal]: meta = self._tool_by_index.get(idx) if meta is not None and delta.arguments_delta: meta.args_buf += delta.arguments_delta + elif isinstance(delta, ReasoningContentDelta): + if idx in self._reasoning_index_open and delta.content_delta: + self._reasoning_text[idx] = self._reasoning_text.get(idx, "") + delta.content_delta + elif isinstance(delta, ReasoningSummaryDelta): + if idx in self._reasoning_index_open and delta.summary_delta: + self._reasoning_text[idx] = self._reasoning_text.get(idx, "") + delta.summary_delta return [] def _on_full(self, event: StreamTaskMessageFull) -> list[SpanSignal]: @@ -150,5 +168,6 @@ def _on_done(self, event: StreamTaskMessageDone) -> list[SpanSignal]: return [OpenSpan(key=meta.tool_call_id, kind="tool", name=meta.name, input=args)] if idx in self._reasoning_index_open: self._reasoning_index_open.discard(idx) - return [CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=True)] + text = self._reasoning_text.pop(idx, "") + return [CloseSpan(key=f"reasoning:{idx}", output=text or None, is_complete=True)] return [] diff --git a/tests/lib/core/harness/test_span_derivation.py b/tests/lib/core/harness/test_span_derivation.py index 51e2ede2c..6376dc0c6 100644 --- a/tests/lib/core/harness/test_span_derivation.py +++ b/tests/lib/core/harness/test_span_derivation.py @@ -10,6 +10,8 @@ ) from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta from agentex.lib.core.harness.span_derivation import SpanDeriver @@ -98,9 +100,86 @@ def test_reasoning_opens_on_start_closes_on_done(): ] sigs = _signals(d, events) assert sigs[0] == OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={}) + # No deltas -> nothing to record, so output stays None (not an empty string). assert sigs[1] == CloseSpan(key="reasoning:0", output=None, is_complete=True) +def test_reasoning_content_deltas_recorded_as_output(): + """The chain-of-thought streamed via ReasoningContentDelta lands on the + reasoning span's output (previously dropped, leaving the span blank).""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta(type="reasoning_content", content_index=0, content_delta="Let me "), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta(type="reasoning_content", content_index=0, content_delta="think."), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={}) + assert sigs[1] == CloseSpan(key="reasoning:0", output="Let me think.", is_complete=True) + + +def test_reasoning_summary_deltas_recorded_as_output(): + """Reasoning-model summary deltas (o-series) also land on the span output.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningSummaryDelta(type="reasoning_summary", summary_index=0, summary_delta="Summary text"), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[1] == CloseSpan(key="reasoning:0", output="Summary text", is_complete=True) + + +def test_reasoning_text_seeded_from_start_content(): + """A non-streaming harness that carries the full thinking on the Start + content still records it as output even with no deltas.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent(type="reasoning", author="agent", summary=[], content=["full thought"]), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[1] == CloseSpan(key="reasoning:0", output="full thought", is_complete=True) + + +def test_reasoning_unclosed_flushes_with_text(): + """An unclosed reasoning span flushes incomplete but still carries its text.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta(type="reasoning_content", content_index=0, content_delta="partial"), + ), + ] + sigs = _signals(d, events) + assert sigs[-1] == CloseSpan(key="reasoning:0", output="partial", is_complete=False) + + def test_parallel_tools_pair_by_tool_call_id(): d = SpanDeriver() events = [ From 01fbc847fadfb3f8dc81c5135eb31ad7f3eb1034 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Wed, 24 Jun 2026 16:42:07 -0400 Subject: [PATCH 4/7] fix(claude-code): dedup streamed blocks by content, not block index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The golden agent emitted duplicate text messages: a single streamed assistant message (e.g. thinking + text) can materialise as SEPARATE `assistant` envelopes, and the converter reset its per-block streamed-index set after EACH materialised envelope. The thinking envelope (first) deduped fine, but the reset then wiped the set, so the text block in the second envelope lost its "already streamed" marker and was re-emitted. Numeric block indexes can't distinguish "already-streamed text materialising in a later envelope" (skip) from "a new turn's non-streamed text at the same index" (emit) — both arrive as a lone block at index 0. Switch to content-based dedup: record the full text of each streamed text/thinking block on content_block_stop, and skip a materialised block whose text matches (consuming one entry so a genuinely repeated later block still emits). Removes the per-envelope reset and the fragile pending/once-guard index machinery. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../lib/adk/_modules/_claude_code_sync.py | 114 ++++++++---------- tests/lib/adk/test_claude_code_sync.py | 42 +++++++ 2 files changed, 90 insertions(+), 66 deletions(-) diff --git a/src/agentex/lib/adk/_modules/_claude_code_sync.py b/src/agentex/lib/adk/_modules/_claude_code_sync.py index 4e25503cf..0305a1ea8 100644 --- a/src/agentex/lib/adk/_modules/_claude_code_sync.py +++ b/src/agentex/lib/adk/_modules/_claude_code_sync.py @@ -98,18 +98,15 @@ async def convert_claude_code_to_agentex_events( _text_open = False _text_buf = "" _text_index: int | None = None - # Track which assistant-message block indices were already streamed via - # stream_event triples. Those blocks must not be re-emitted when the full - # assistant message arrives. Reset at each message boundary (see below) so a - # later turn's block indices don't collide with an earlier turn's. - _streamed_block_indexes: set[int] = set() - # Once-guard so a thinking block's pending index is claimed on its first - # thinking_delta only. Reset per turn alongside _streamed_block_indexes. - _saw_thinking_stream = False - # For deferred ReasoningStarted: if a content_block_start(thinking) arrives - # but no thinking_delta ever follows, the final assistant block's thinking - # field fills the reasoning content instead. - _pending_thinking_block_index: int | None = None + # Full text of each block already delivered via stream_event deltas, so the + # materialised assistant envelope does not re-emit it. Matched by CONTENT, + # not block index: a single streamed message can arrive as several assistant + # envelopes (e.g. a thinking block, then the text block), and the per-block + # numeric index does not survive that split while the text does. Each match + # is consumed (one entry removed) so a genuinely repeated later block — a new + # turn that happens to emit identical text — is still delivered. + _streamed_texts: list[str] = [] + _streamed_thinkings: list[str] = [] async for raw in lines: if not raw: @@ -138,43 +135,46 @@ async def convert_claude_code_to_agentex_events( if not isinstance(blocks, list): blocks = [blocks] - for idx, block in enumerate(blocks): + for block in blocks: if not isinstance(block, dict): continue block_type = block.get("type", "") if block_type == "text": - # Skip only the specific blocks already delivered via - # stream_event deltas (per-block, not a turn-wide latch). - if idx in _streamed_block_indexes: - continue text = block.get("text", "") - if text: - msg_index = next_index - next_index += 1 - yield StreamTaskMessageStart( - type="start", - index=msg_index, - content=TextContent( - type="text", - author="agent", - content="", - ), - ) - yield StreamTaskMessageDelta( - type="delta", - index=msg_index, - delta=TextDelta(type="text", text_delta=text), - ) - yield StreamTaskMessageDone(type="done", index=msg_index) + if not text: + continue + # Skip blocks already delivered via stream_event deltas, + # matched by content (see _streamed_texts). + if text in _streamed_texts: + _streamed_texts.remove(text) + continue + msg_index = next_index + next_index += 1 + yield StreamTaskMessageStart( + type="start", + index=msg_index, + content=TextContent( + type="text", + author="agent", + content="", + ), + ) + yield StreamTaskMessageDelta( + type="delta", + index=msg_index, + delta=TextDelta(type="text", text_delta=text), + ) + yield StreamTaskMessageDone(type="done", index=msg_index) elif block_type == "thinking": - # Skip only the specific blocks already delivered via - # stream_event deltas (per-block, not a turn-wide latch). - if idx in _streamed_block_indexes: - continue thinking_text = block.get("thinking", "") if thinking_text: + # Skip blocks already delivered via stream_event deltas, + # matched by content (see _streamed_thinkings). + if thinking_text in _streamed_thinkings: + _streamed_thinkings.remove(thinking_text) + continue summary = _extract_summary(thinking_text) msg_index = next_index next_index += 1 @@ -243,20 +243,12 @@ async def convert_claude_code_to_agentex_events( ), ) - # End of a materialised message: reset per-turn streaming dedup state - # so the next turn's stream_event indices start clean. Without this, - # a block index streamed in an earlier turn would linger in the set - # and silently drop a later turn's non-streamed block at that index. - _streamed_block_indexes = set() - _saw_thinking_stream = False - # ----------------------------------------------------------------------- # stream_event — incremental streaming deltas # ----------------------------------------------------------------------- elif evt_type == "stream_event": se = evt.get("event") or {} se_type = se.get("type", "") - block_index = se.get("index") if se_type == "content_block_start": block = se.get("content_block") or {} @@ -265,11 +257,6 @@ async def convert_claude_code_to_agentex_events( if btype == "thinking": _thinking_open = True _thinking_buf = "" - # Defer marking the block as streamed until we actually - # receive a thinking_delta. Some configurations emit a - # thinking block_start but no deltas — in that case we want - # the final assistant-message handler to fill the text. - _pending_thinking_block_index = block_index if isinstance(block_index, int) else None msg_index = next_index next_index += 1 _thinking_index = msg_index @@ -288,8 +275,6 @@ async def convert_claude_code_to_agentex_events( elif btype == "text": _text_open = True _text_buf = "" - if isinstance(block_index, int): - _streamed_block_indexes.add(block_index) msg_index = next_index next_index += 1 _text_index = msg_index @@ -310,12 +295,6 @@ async def convert_claude_code_to_agentex_events( if dtype == "thinking_delta": chunk = delta.get("thinking", "") if chunk and _thinking_open: - if not _saw_thinking_stream: - _saw_thinking_stream = True - # Now mark the block as claimed so the assistant - # message handler won't re-emit it. - if _pending_thinking_block_index is not None: - _streamed_block_indexes.add(_pending_thinking_block_index) _thinking_buf += chunk if _thinking_index is not None: yield StreamTaskMessageDelta( @@ -342,18 +321,21 @@ async def convert_claude_code_to_agentex_events( elif se_type == "content_block_stop": if _thinking_open: _thinking_open = False + # Record the streamed thinking so the materialised assistant + # envelope doesn't re-emit it. Skip empties: a block_start with + # no deltas leaves the assistant envelope free to fill the text. + if _thinking_buf: + _streamed_thinkings.append(_thinking_buf) _thinking_buf = "" - _pending_thinking_block_index = None - # Reset the once-guard per thinking block: a turn can stream a - # second thinking block, and without this the guard stays True, - # the second block's index is never claimed, and the final - # assistant envelope re-emits it (duplicate Start/Delta/Done). - _saw_thinking_stream = False if _thinking_index is not None: yield StreamTaskMessageDone(type="done", index=_thinking_index) _thinking_index = None elif _text_open: _text_open = False + # Record the streamed text for content-based dedup against the + # materialised assistant envelope (see _streamed_texts). + if _text_buf: + _streamed_texts.append(_text_buf) _text_buf = "" if _text_index is not None: yield StreamTaskMessageDone(type="done", index=_text_index) diff --git a/tests/lib/adk/test_claude_code_sync.py b/tests/lib/adk/test_claude_code_sync.py index 6dd36d973..d812ea9ea 100644 --- a/tests/lib/adk/test_claude_code_sync.py +++ b/tests/lib/adk/test_claude_code_sync.py @@ -140,6 +140,48 @@ async def test_streamed_text_not_re_emitted_by_assistant_block(self): text_starts = [e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, TextContent)] assert len(text_starts) == 1, "Text block must not be emitted twice" + async def test_streamed_message_split_across_assistant_envelopes_not_duplicated(self): + """Regression: one streamed message (thinking + text) can materialise as + SEPARATE assistant envelopes. Content-based dedup must skip both streamed + blocks even though the text arrives in its own later envelope — an earlier + index-based scheme re-emitted the text (duplicate).""" + envelopes = [ + # Streamed: thinking at block index 0, then text at block index 1. + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 0, "content_block": {"type": "thinking"}}, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": "ponder"}, + }, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": 0}}, + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 1, "content_block": {"type": "text"}}, + }, + { + "type": "stream_event", + "event": {"type": "content_block_delta", "index": 1, "delta": {"type": "text_delta", "text": "answer"}}, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": 1}}, + # Materialised as two separate assistant envelopes (thinking alone at + # idx 0, then text alone at idx 0) — the shape that caused duplicates. + {"type": "assistant", "message": {"content": [{"type": "thinking", "thinking": "ponder"}]}}, + {"type": "assistant", "message": {"content": [{"type": "text", "text": "answer"}]}}, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + text_starts = [e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, TextContent)] + reasoning_starts = [ + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ReasoningContent) + ] + assert len(text_starts) == 1, "Streamed text must not be re-emitted by its own materialised envelope" + assert len(reasoning_starts) == 1, "Streamed thinking must not be re-emitted either" + async def test_later_turn_non_streamed_text_not_dropped(self): """A non-streamed text block in a later turn must not be dropped because an earlier turn streamed a block at the same index.""" From 90182cb733b16367af05427909607aab42f6f05e Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Wed, 24 Jun 2026 16:46:34 -0400 Subject: [PATCH 5/7] docs(migration): add guide for moving off the legacy claude_agents plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Document migrating from the original Temporal `claude_agents` plugin (`run_claude_agent_activity` + bespoke streaming/tracing) to the unified harness tap (`ClaudeCodeTurn` over the CLI stream-json stdout, delivered via UnifiedEmitter), which gets central span derivation — tool AND reasoning spans — and the shared delivery path. Also records the two Claude Code defect fixes (reasoning span text, duplicate-text dedup) in the guide, and adds a deprecation note to the plugin module docstring pointing at the tap. No code removal — the plugin still has consumers (090 tutorial, eval_dashboard_agent). Co-Authored-By: Claude Opus 4.8 (1M context) --- adk/docs/migration-0.16.0.md | 72 ++++++++++++++++++- .../plugins/claude_agents/__init__.py | 11 +++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/adk/docs/migration-0.16.0.md b/adk/docs/migration-0.16.0.md index 6f808a0f1..b76da55ba 100644 --- a/adk/docs/migration-0.16.0.md +++ b/adk/docs/migration-0.16.0.md @@ -197,6 +197,76 @@ path. No API change — behavior only. function-tool response path (`on_tool_end`) so hosted and function tools render identically within the same flow. +5. **Reasoning text now appears in derived spans.** `SpanDeriver` opened reasoning + spans with empty input and closed them with `output=None`, so reasoning/thinking + text never reached the trace (spans showed blank — read as "0 reasoning traces"). + It now accumulates the `ReasoningContentDelta` / `ReasoningSummaryDelta` text (and + any text seeded on the Start content) and records it as the span output. Affects + every harness that streams reasoning, including the Claude Code tap. + +6. **Claude Code: no more duplicate text messages.** The `stream-json` converter + deduped streamed-vs-materialized blocks by numeric block index and reset that + state after every materialized `assistant` envelope. A single streamed message + that materializes as several envelopes (thinking, then text) lost the dedup + marker between envelopes and re-emitted the text. Dedup is now **content-based** + (match the streamed block's text, consume once), which a numeric index cannot do + reliably. + > Action: if you adopted `OpenAITurn` for **reasoning models** (o1/o3/gpt-5) on > the sync path before these fixes, upgrade — fixes 2 and 3 are required for -> correct reasoning rendering. +> correct reasoning rendering. Claude Code agents on the unified harness tap should +> upgrade for fixes 5 and 6. + +--- + +## 6. Legacy Temporal `claude_agents` plugin → unified harness tap + +`agentex.lib.core.temporal.plugins.claude_agents` (`run_claude_agent_activity`, +`create_streaming_hooks`, `TemporalStreamingHooks`, `ClaudeMessageHandler`) is the +**original** Claude Code integration: it drives the Python `claude-agent-sdk` +directly and hand-rolls its own streaming + tracing. It is **superseded** by the +unified harness tap and slated for removal in a future release. It still works +today, so this migration is **recommended, not yet required** — but new Claude Code +agents should use the tap, and existing ones should plan to move. + +Why migrate: the tap routes Claude Code through the same canonical +`StreamTaskMessage*` stream as every other harness, so it gets central span +derivation (tool **and** reasoning spans), the single delivery path +(`UnifiedEmitter`), and fixes like the two above for free. The legacy plugin does +not derive reasoning spans at all and duplicates the streaming/tracing logic. + +**Before — legacy plugin activity:** + +```python +from agentex.lib.core.temporal.plugins.claude_agents import run_claude_agent_activity + +# In the workflow: +result = await workflow.execute_activity( + run_claude_agent_activity, + args=[prompt, workspace_path, allowed_tools, ...], + start_to_close_timeout=..., +) +``` + +**After — unified harness tap.** Run the CLI yourself (`claude -p --output-format +stream-json --include-partial-messages`), wrap its stdout in `ClaudeCodeTurn`, and +deliver through `UnifiedEmitter`: + +```python +from agentex.lib.adk import ClaudeCodeTurn, UnifiedEmitter + +# `stdout_lines` is an async iterator of the CLI's stdout lines (raw JSON strings +# or pre-parsed dicts) — e.g. read from sandbox.exec() / a subprocess. +turn = ClaudeCodeTurn(stdout_lines) + +emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=parent_span_id) +result = await emitter.auto_send_turn(turn, created_at=workflow.now()) +# result.final_text — last text segment +# result.usage — TurnUsage (tokens, cost, num_reasoning_blocks, ...) +``` + +The golden agent is the reference implementation +(`teams/sgp/agents/golden_agent/project/harness/`): it spawns the CLI in a sandbox, +yields stdout lines into `ClaudeCodeTurn`, and drives `auto_send_turn`. Known +remaining consumers to migrate: the `090_claude_agents_sdk_mvp` tutorial and the +`eval_dashboard_agent`. diff --git a/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py b/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py index fd40545ec..1e9ee694a 100644 --- a/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py +++ b/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py @@ -1,5 +1,16 @@ """Claude Agents SDK integration with Temporal. +.. deprecated:: + This is the original Claude Code integration: it drives the Python + ``claude-agent-sdk`` directly and hand-rolls its own streaming + tracing + (and does not derive reasoning spans). It is superseded by the unified + harness tap (``agentex.lib.adk.ClaudeCodeTurn`` over the ``claude -p + --output-format stream-json`` CLI stdout, delivered via ``UnifiedEmitter``), + which routes Claude Code through the same canonical ``StreamTaskMessage*`` + stream as every other harness. It still works, but new agents should use the + tap and existing ones should plan to migrate; see + ``adk/docs/migration-0.16.0.md`` for the before/after. + This plugin provides integration between Claude Agents SDK and AgentEx's Temporal-based orchestration platform. From 2d66f9b8d194dd93bd419bdb56fd0ebd2ba7bee5 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Wed, 24 Jun 2026 17:02:58 -0400 Subject: [PATCH 6/7] fix(claude-code): dedup interleaved materialized blocks mid-stream Claude Code can emit the materialized `assistant` envelope for a content block WHILE that block is still streaming (the envelope arrives between two content_block_delta events, before content_block_stop). At that point the streamed block's buffer has not yet been recorded in _streamed_texts / _streamed_thinkings, so content-recorded dedup misses it and the block is emitted twice (observed as duplicate reasoning messages in the golden agent). Add a second dedup condition: skip a materialized text/thinking block when a streamed block of the same type is still open and its partial buffer is a prefix of the materialized full text. Prefix-match confirms it's the same block without dropping a genuinely different one. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../lib/adk/_modules/_claude_code_sync.py | 18 +++++++--- tests/lib/adk/test_claude_code_sync.py | 36 +++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/src/agentex/lib/adk/_modules/_claude_code_sync.py b/src/agentex/lib/adk/_modules/_claude_code_sync.py index 0305a1ea8..93a639118 100644 --- a/src/agentex/lib/adk/_modules/_claude_code_sync.py +++ b/src/agentex/lib/adk/_modules/_claude_code_sync.py @@ -144,11 +144,17 @@ async def convert_claude_code_to_agentex_events( text = block.get("text", "") if not text: continue - # Skip blocks already delivered via stream_event deltas, - # matched by content (see _streamed_texts). + # Skip blocks already delivered via stream_event deltas. Two + # cases: (1) the streamed block already finished — its full + # text is recorded in _streamed_texts; (2) the materialised + # envelope arrives INTERLEAVED, mid-stream, before the streamed + # block's content_block_stop records its buffer — the still-open + # block's partial buffer is a prefix of this full text. if text in _streamed_texts: _streamed_texts.remove(text) continue + if _text_open and _text_buf and text.startswith(_text_buf): + continue msg_index = next_index next_index += 1 yield StreamTaskMessageStart( @@ -170,11 +176,15 @@ async def convert_claude_code_to_agentex_events( elif block_type == "thinking": thinking_text = block.get("thinking", "") if thinking_text: - # Skip blocks already delivered via stream_event deltas, - # matched by content (see _streamed_thinkings). + # Skip blocks already delivered via stream_event deltas. + # Same two cases as text above: finished streamed block + # (recorded), or an interleaved materialised envelope whose + # text the still-open streamed buffer is a prefix of. if thinking_text in _streamed_thinkings: _streamed_thinkings.remove(thinking_text) continue + if _thinking_open and _thinking_buf and thinking_text.startswith(_thinking_buf): + continue summary = _extract_summary(thinking_text) msg_index = next_index next_index += 1 diff --git a/tests/lib/adk/test_claude_code_sync.py b/tests/lib/adk/test_claude_code_sync.py index d812ea9ea..5a78acaf7 100644 --- a/tests/lib/adk/test_claude_code_sync.py +++ b/tests/lib/adk/test_claude_code_sync.py @@ -182,6 +182,42 @@ async def test_streamed_message_split_across_assistant_envelopes_not_duplicated( assert len(text_starts) == 1, "Streamed text must not be re-emitted by its own materialised envelope" assert len(reasoning_starts) == 1, "Streamed thinking must not be re-emitted either" + async def test_interleaved_materialized_block_not_duplicated(self): + """Regression: the materialised `assistant` envelope can arrive MID-stream + (before the streamed block's content_block_stop). Content-recorded dedup + hasn't fired yet, so the still-open block's partial buffer is prefix-matched + against the materialised full text to suppress the duplicate.""" + envelopes = [ + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 0, "content_block": {"type": "thinking"}}, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": "I"}, + }, + }, + # Materialised envelope interleaved before content_block_stop. + {"type": "assistant", "message": {"content": [{"type": "thinking", "thinking": "I need to load tools."}]}}, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": " need to load tools."}, + }, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": 0}}, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + reasoning_starts = [ + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ReasoningContent) + ] + assert len(reasoning_starts) == 1, "Interleaved materialised reasoning must not duplicate the streamed block" + async def test_later_turn_non_streamed_text_not_dropped(self): """A non-streamed text block in a later turn must not be dropped because an earlier turn streamed a block at the same index.""" From 5e8759430050d05b1f4d3db9fbbb55080d5a36f2 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Wed, 24 Jun 2026 17:13:06 -0400 Subject: [PATCH 7/7] fix(harness): coerce non-dict span input/output so spans aren't dropped MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SGP spans API requires a span's input/output to be an object: a scalar or string payload is rejected with a 422 ("Input should be a valid dictionary") and the async processor drops the span. The reasoning-span fix records the chain-of-thought as a plain-string output, so reasoning spans were silently dropped — observed as "0 reasoning traces" in the golden agent while tool spans (dict output) survived. Some harnesses' tool results are also plain strings and would hit the same 422. Coerce at the tracer boundary: wrap any non-dict input/output under a single key ({"output": ...} / {"input": ...}); dicts and None pass through unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/core/harness/tracer.py | 19 +++++++++++-- tests/lib/core/harness/test_auto_send.py | 3 ++- tests/lib/core/harness/test_tracer.py | 27 ++++++++++++++++++- tests/lib/core/harness/test_yield_delivery.py | 3 ++- 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/agentex/lib/core/harness/tracer.py b/src/agentex/lib/core/harness/tracer.py index 0c6167b76..bf37bad30 100644 --- a/src/agentex/lib/core/harness/tracer.py +++ b/src/agentex/lib/core/harness/tracer.py @@ -16,6 +16,21 @@ logger = logging.getLogger(__name__) +def _as_span_payload(value: Any, *, key: str) -> Any: + """Coerce a span input/output payload into a dict. + + The SGP spans API requires ``input`` and ``output`` to be objects: a scalar + or string is rejected with a 422 and the span is dropped by the async + processor. The SpanDeriver legitimately produces non-dict payloads — the + reasoning span's output is the chain-of-thought string, and some harnesses' + tool results are plain strings — so wrap anything that isn't already a dict + (``None`` passes through unchanged so an absent payload stays absent). + """ + if value is None or isinstance(value, dict): + return value + return {key: value} + + class SpanTracer: """Opens/closes adk.tracing child spans in response to span signals. @@ -60,7 +75,7 @@ async def handle(self, signal: SpanSignal) -> None: span = await self._tracing.start_span( trace_id=self.trace_id, name=signal.name, - input=signal.input, + input=_as_span_payload(signal.input, key="input"), parent_id=self.parent_span_id, task_id=self.task_id, ) @@ -73,7 +88,7 @@ async def handle(self, signal: SpanSignal) -> None: # The real TracingModule.end_span signature is: # end_span(trace_id, span, start_to_close_timeout, heartbeat_timeout, retry_policy) # It does not accept an output= kwarg. - span.output = signal.output + span.output = _as_span_payload(signal.output, key="output") # Tool failure status (ToolResponseContent.is_error) is recorded # on span.data when the harness reports one; Span has no dedicated # error field. None means no status was reported, so leave data alone. diff --git a/tests/lib/core/harness/test_auto_send.py b/tests/lib/core/harness/test_auto_send.py index 764dae8b3..8133a488c 100644 --- a/tests/lib/core/harness/test_auto_send.py +++ b/tests/lib/core/harness/test_auto_send.py @@ -218,7 +218,8 @@ async def test_auto_send_derives_tool_spans_via_tracer(): assert result.final_text == "" assert fake_tracing.started_names == ["Bash"] - assert fake_tracing.ended_outputs == ["ok"] + # String tool output is wrapped in a dict (SGP spans require an object). + assert fake_tracing.ended_outputs == [{"output": "ok"}] # --------------------------------------------------------------------------- diff --git a/tests/lib/core/harness/test_tracer.py b/tests/lib/core/harness/test_tracer.py index b3d9002c4..9bd17b90c 100644 --- a/tests/lib/core/harness/test_tracer.py +++ b/tests/lib/core/harness/test_tracer.py @@ -15,7 +15,32 @@ async def test_open_then_close_starts_and_ends_span(): await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"})) await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) assert fake.started == [("Bash", "p1", {"cmd": "ls"})] - assert fake.ended == [("Bash", "files")] + # A plain-string output is wrapped in a dict (SGP spans require an object). + assert fake.ended == [("Bash", {"output": "files"})] + + +@pytest.mark.asyncio +async def test_non_dict_payloads_are_wrapped_in_a_dict(): + """SGP spans reject scalar input/output with a 422; the tracer wraps any + non-dict payload so reasoning spans (string output) are not dropped.""" + fake = FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={})) + await tracer.handle(CloseSpan(key="reasoning:0", output="chain of thought", is_complete=True)) + # Empty-dict input stays a dict; string output is wrapped. + assert fake.started == [("reasoning", "p1", {})] + assert fake.ended == [("reasoning", {"output": "chain of thought"})] + + +@pytest.mark.asyncio +async def test_dict_and_none_payloads_pass_through_unchanged(): + fake = FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="c", kind="tool", name="T", input={"a": 1})) + await tracer.handle(CloseSpan(key="c", output={"result": "x"}, is_complete=True)) + await tracer.handle(OpenSpan(key="d", kind="tool", name="U", input={})) + await tracer.handle(CloseSpan(key="d", output=None, is_complete=False)) + assert fake.ended == [("T", {"result": "x"}), ("U", None)] @pytest.mark.asyncio diff --git a/tests/lib/core/harness/test_yield_delivery.py b/tests/lib/core/harness/test_yield_delivery.py index ef3861a16..21c93a95c 100644 --- a/tests/lib/core/harness/test_yield_delivery.py +++ b/tests/lib/core/harness/test_yield_delivery.py @@ -42,7 +42,8 @@ async def test_yield_passes_events_through_and_traces(): out = [e async for e in yield_events(_gen(events), tracer=tracer)] assert out == events # passthrough unchanged assert fake.started_names == ["Bash"] # span derived + opened - assert fake.ended_outputs == ["ok"] # span closed with response + # String tool output is wrapped in a dict (SGP spans require an object). + assert fake.ended_outputs == [{"output": "ok"}] # span closed with response @pytest.mark.asyncio