From a75aa68668f3577a3616ef170586c650cd401a0d Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Thu, 25 Jun 2026 16:30:23 +0200 Subject: [PATCH 1/6] fix(streaming): StreamTaskMessageFull closes the coalescing buffer A StreamTaskMessageFull ends the stream and marks the context done, but it did not close the coalescing buffer. __aexit__'s close() then short-circuits on _is_closed and never stops the buffer's ticker, leaving an orphaned background task. Buffered deltas could also publish after the terminal Full, which a consumer treating Full as final reads as a stale duplicate tail. Drain and stop the buffer before publishing the Full (deltas -> Full ordering), and reap the buffer in close() before the _is_closed short-circuit so it can't be orphaned on any path. Co-Authored-By: Claude Opus 4.8 --- .../lib/core/services/adk/streaming.py | 21 +++++-- tests/lib/core/services/adk/test_streaming.py | 59 ++++++++++++++++++- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 7215f084c..deacda88b 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -420,15 +420,17 @@ async def close(self) -> TaskMessage: if not self.task_message: raise ValueError("Context not properly initialized - no task message") - if self._is_closed: - return self.task_message # Already done - - # Drain any buffered deltas before announcing DONE so consumers see the - # full sequence in order. + # Reap the buffer (stopping its ticker) before the _is_closed + # short-circuit, so a context already marked done by a Full update can't + # leave the ticker orphaned. Draining here also lets consumers see the + # full delta sequence in order before DONE. if self._buffer is not None: await self._buffer.close() self._buffer = None + if self._is_closed: + return self.task_message # Already done (buffer reaped above) + # Send the DONE event done_event = StreamTaskMessageDone( parent_task_message=self.task_message, @@ -486,6 +488,15 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | await self._buffer.add(update) return update + # A Full ends the stream and supersedes buffered deltas. Drain and stop + # the buffer BEFORE publishing the Full, so leftover deltas land in order + # (deltas -> Full) instead of trailing the terminal Full as a stale + # duplicate tail. This also stops the ticker, which would otherwise be + # orphaned when __aexit__'s close() short-circuits on _is_closed. + if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: + await self._buffer.close() + self._buffer = None + result = await self._streaming_service.stream_update(update) if isinstance(update, StreamTaskMessageDone): diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index b07c55f74..f74b3ad32 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -22,7 +22,10 @@ ToolResponseDelta, ReasoningSummaryDelta, ) -from agentex.types.task_message_update import StreamTaskMessageDelta +from agentex.types.task_message_update import ( + StreamTaskMessageFull, + StreamTaskMessageDelta, +) from agentex.lib.core.services.adk.streaming import ( CoalescingBuffer, StreamingTaskMessageContext, @@ -520,3 +523,57 @@ async def test_open_without_created_at_passes_omit(self) -> None: kwargs = client.messages.create.call_args.kwargs assert kwargs["created_at"] is omit + + +class TestFullMessageClosesBuffer: + """A StreamTaskMessageFull must stop the buffer ticker and drain its deltas + before the terminal Full. Marking the context done without closing the + buffer leaves close()'s _is_closed short-circuit to orphan the ticker, and + publishing buffered deltas after the Full reads as a stale duplicate tail.""" + + @pytest.mark.asyncio + async def test_full_message_stops_ticker(self) -> None: + ctx, _svc, tm = await _make_context("coalesced") + # A delta makes the buffer and its ticker live. + await ctx.stream_update(_text(tm, "hello")) + buf = ctx._buffer + assert buf is not None + task = buf._task + assert task is not None and not task.done() + + await ctx.stream_update( + StreamTaskMessageFull( + parent_task_message=tm, + content=TextContent(author="agent", content="final", format="markdown"), + type="full", + ) + ) + + assert ctx._buffer is None, "Full message left the buffer un-closed" + assert task.done(), "coalescing-buffer ticker still running after Full (orphaned)" + + @pytest.mark.asyncio + async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: + # Buffered deltas must publish BEFORE the Full, never after (a trailing + # delta after the terminal Full reads as a stale duplicate tail). + ctx, svc, tm = await _make_context("coalesced") + # "alpha" flushes immediately; "beta" stays buffered in the window. + await ctx.stream_update(_text(tm, "alpha")) + await ctx.stream_update(_text(tm, "beta")) + + full = StreamTaskMessageFull( + parent_task_message=tm, + content=TextContent(author="agent", content="alphabeta", format="markdown"), + type="full", + ) + await ctx.stream_update(full) + + published = [c.args[0] for c in svc.stream_update.await_args_list] + assert published, "nothing was published" + assert published[-1] is full, ( + f"Full must be the terminal publish; saw trailing " + f"{type(published[-1]).__name__} after it (stale duplicate tail)" + ) + assert any(isinstance(u, StreamTaskMessageDelta) for u in published[:-1]), ( + "expected the buffered deltas to be published before the Full" + ) From 28962954012b003091d049e6770c39eda387252a Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Thu, 25 Jun 2026 17:21:47 +0200 Subject: [PATCH 2/6] fix(streaming): re-check _closed under lock to avoid stranding a racing delta MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit add() checked _closed before taking the lock, so a delta racing a Full-driven buffer close() could pass the check, then append after the buffer was drained and its ticker shut down — stranding the delta, never published. Re-check _closed under the lock before appending. Co-Authored-By: Claude Opus 4.8 --- src/agentex/lib/core/services/adk/streaming.py | 6 ++++++ tests/lib/core/services/adk/test_streaming.py | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index deacda88b..f47e852d0 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -177,6 +177,12 @@ async def add(self, update: StreamTaskMessageDelta) -> None: if self._closed: return async with self._lock: + # Re-check under the lock: a concurrent close() (e.g. from a racing + # Full) may have drained and shut down the ticker after the check + # above but before we acquired the lock. Appending now would strand + # the delta in a dead buffer, never published. + if self._closed: + return self._buf.append(update) self._buf_chars += _delta_char_len(update.delta) if not self._first_flushed or self._buf_chars >= self.MAX_BUFFERED_CHARS: diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index f74b3ad32..0b00b6790 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -355,6 +355,24 @@ async def on_flush(u: StreamTaskMessageDelta) -> None: await buf.add(_text(task_message, "after")) assert flushed == [] + @pytest.mark.asyncio + async def test_add_racing_close_is_not_stranded(self, task_message: TaskMessage) -> None: + """TOCTOU: a delta that passes add()'s pre-lock _closed check but only + acquires the lock after close() set _closed must be dropped, not appended + to a drained, ticker-less buffer where it would never be published.""" + buf = CoalescingBuffer(on_flush=AsyncMock()) + buf.start() + # Hold the lock so add() parks *after* its pre-lock _closed check. + await buf._lock.acquire() + add_task = asyncio.create_task(buf.add(_text(task_message, "racing"))) + await asyncio.sleep(0) # add() passes the _closed check, blocks on the lock + buf._closed = True # close() wins the race + buf._lock.release() + await add_task + + assert buf._buf == [], "racing delta was stranded in the closed buffer" + await buf.close() # cleanup + class TestCoalescingBufferCloseDuringFlush: @pytest.mark.asyncio From b462ec07fd7068fc58d855642696d919cdde7d5a Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Fri, 26 Jun 2026 09:33:38 +0200 Subject: [PATCH 3/6] refactor(streaming): _reap_buffer helper + _is_closing guard for late deltas Review follow-ups on the StreamTaskMessageFull fix: - Extract _reap_buffer() and use it in both close() and the Full branch. - Add an _is_closing flag set when a terminal (Full/Done) starts processing; the delta path drops a late delta that races in after the buffer is reaped but before _is_closed is set, so it can't publish after the terminal. - Clarify the ordering-test comment (batching of the two deltas is irrelevant; the invariant is deltas-before-Full). Co-Authored-By: Claude Opus 4.8 --- .../lib/core/services/adk/streaming.py | 33 +++++++++++++++---- tests/lib/core/services/adk/test_streaming.py | 28 +++++++++++++++- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index f47e852d0..1794221ec 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -386,6 +386,9 @@ def __init__( self._agentex_client = agentex_client self._streaming_service = streaming_service self._is_closed = False + # Set once a terminal (Full/Done) starts processing, so a concurrent + # delta can't publish after the terminal during the close window. + self._is_closing = False self._delta_accumulator = DeltaAccumulator() self._streaming_mode: StreamingMode = streaming_mode self._buffer: CoalescingBuffer | None = None @@ -399,6 +402,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): async def open(self) -> "StreamingTaskMessageContext": self._is_closed = False + self._is_closing = False self.task_message = await self._agentex_client.messages.create( task_id=self.task_id, @@ -421,6 +425,15 @@ async def open(self) -> "StreamingTaskMessageContext": return self + async def _reap_buffer(self) -> None: + """Drain and stop the coalescing buffer, releasing its background ticker. + + Idempotent: a no-op once the buffer has already been reaped. + """ + if self._buffer is not None: + await self._buffer.close() + self._buffer = None + async def close(self) -> TaskMessage: """Close the streaming context.""" if not self.task_message: @@ -430,9 +443,7 @@ async def close(self) -> TaskMessage: # short-circuit, so a context already marked done by a Full update can't # leave the ticker orphaned. Draining here also lets consumers see the # full delta sequence in order before DONE. - if self._buffer is not None: - await self._buffer.close() - self._buffer = None + await self._reap_buffer() if self._is_closed: return self.task_message # Already done (buffer reaped above) @@ -493,15 +504,25 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | if self._streaming_mode == "coalesced" and self._buffer is not None: await self._buffer.add(update) return update + if self._is_closing: + # A terminal (Full/Done) is in flight and the buffer is already + # reaped; drop this late delta instead of publishing it after the + # terminal event. It is still recorded in the accumulator above. + return update + + # From here the update is terminal. Mark closing so a concurrent delta + # can't fall through and publish after the terminal once the buffer is + # reaped below. + if isinstance(update, (StreamTaskMessageFull, StreamTaskMessageDone)): + self._is_closing = True # A Full ends the stream and supersedes buffered deltas. Drain and stop # the buffer BEFORE publishing the Full, so leftover deltas land in order # (deltas -> Full) instead of trailing the terminal Full as a stale # duplicate tail. This also stops the ticker, which would otherwise be # orphaned when __aexit__'s close() short-circuits on _is_closed. - if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: - await self._buffer.close() - self._buffer = None + if isinstance(update, StreamTaskMessageFull): + await self._reap_buffer() result = await self._streaming_service.stream_update(update) diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 0b00b6790..36d31343d 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -575,7 +575,9 @@ async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: # Buffered deltas must publish BEFORE the Full, never after (a trailing # delta after the terminal Full reads as a stale duplicate tail). ctx, svc, tm = await _make_context("coalesced") - # "alpha" flushes immediately; "beta" stays buffered in the window. + # Two deltas through the buffer. Regardless of how the coalescing window + # batches them (1 or 2 publishes), the invariant under test is the same: + # every delta publishes before the terminal Full, never after it. await ctx.stream_update(_text(tm, "alpha")) await ctx.stream_update(_text(tm, "beta")) @@ -595,3 +597,27 @@ async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: assert any(isinstance(u, StreamTaskMessageDelta) for u in published[:-1]), ( "expected the buffered deltas to be published before the Full" ) + + @pytest.mark.asyncio + async def test_late_delta_during_close_window_is_not_published_after_full(self) -> None: + """Once a Full reaps the buffer, a delta racing in before _is_closed is + set (buffer already None) must be dropped, not published after the Full. + Simulates the concurrent window via the _is_closing flag directly.""" + ctx, svc, tm = await _make_context("coalesced") + await ctx.stream_update(_text(tm, "hello")) + + full = StreamTaskMessageFull( + parent_task_message=tm, + content=TextContent(author="agent", content="hello", format="markdown"), + type="full", + ) + await ctx.stream_update(full) + # Full set _is_closed; reopen the close window to mimic a delta arriving + # after the buffer was reaped but before the terminal finished. + ctx._is_closed = False + ctx._is_closing = True + svc.stream_update.reset_mock() + + await ctx.stream_update(_text(tm, "late")) + + assert svc.stream_update.await_count == 0, "late delta published after the terminal Full" From 87a6ad06ad23f07625304f46670c070acb8deccd Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Fri, 26 Jun 2026 09:56:00 +0200 Subject: [PATCH 4/6] fix(streaming): mark _is_closing in close() and roll back on terminal failure Greptile follow-ups on the _is_closing guard: - close() sets _is_closing before reaping the buffer, so a delta racing a direct close() (e.g. via __aexit__) can't slip past the None buffer and publish after DONE. - A failed terminal write (e.g. messages.update raising) rolls _is_closing back to False, so the still-open context doesn't silently drop later deltas. Co-Authored-By: Claude Opus 4.8 --- .../lib/core/services/adk/streaming.py | 126 ++++++++++-------- tests/lib/core/services/adk/test_streaming.py | 37 +++++ 2 files changed, 108 insertions(+), 55 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 1794221ec..d3006572c 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -439,41 +439,51 @@ async def close(self) -> TaskMessage: if not self.task_message: raise ValueError("Context not properly initialized - no task message") - # Reap the buffer (stopping its ticker) before the _is_closed - # short-circuit, so a context already marked done by a Full update can't - # leave the ticker orphaned. Draining here also lets consumers see the - # full delta sequence in order before DONE. - await self._reap_buffer() + # Mark closing before reaping so a concurrent coalesced delta can't slip + # past the now-None buffer and publish after DONE. Rolled back below if + # the terminal write fails, so a failed close doesn't wedge the context + # into silently dropping later deltas. + self._is_closing = True + try: + # Reap the buffer (stopping its ticker) before the _is_closed + # short-circuit, so a context already marked done by a Full update + # can't leave the ticker orphaned. Draining here also lets consumers + # see the full delta sequence in order before DONE. + await self._reap_buffer() - if self._is_closed: - return self.task_message # Already done (buffer reaped above) + if self._is_closed: + return self.task_message # Already done (buffer reaped above) - # Send the DONE event - done_event = StreamTaskMessageDone( - parent_task_message=self.task_message, - type="done", - ) - await self._streaming_service.stream_update(done_event) + # Send the DONE event + done_event = StreamTaskMessageDone( + parent_task_message=self.task_message, + type="done", + ) + await self._streaming_service.stream_update(done_event) - # Update the task message with the final content - has_deltas = ( - self._delta_accumulator._accumulated_deltas - or self._delta_accumulator._reasoning_summaries - or self._delta_accumulator._reasoning_contents - ) - if has_deltas: - self.task_message.content = self._delta_accumulator.convert_to_content() + # Update the task message with the final content + has_deltas = ( + self._delta_accumulator._accumulated_deltas + or self._delta_accumulator._reasoning_summaries + or self._delta_accumulator._reasoning_contents + ) + if has_deltas: + self.task_message.content = self._delta_accumulator.convert_to_content() - await self._agentex_client.messages.update( - task_id=self.task_id, - message_id=self.task_message.id, - content=self.task_message.content.model_dump(), - streaming_status="DONE", - ) + await self._agentex_client.messages.update( + task_id=self.task_id, + message_id=self.task_message.id, + content=self.task_message.content.model_dump(), + streaming_status="DONE", + ) - # Mark the context as done - self._is_closed = True - return self.task_message + # Mark the context as done + self._is_closed = True + return self.task_message + except Exception: + if not self._is_closed: + self._is_closing = False + raise async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | None: """Stream an update to the repository. @@ -512,32 +522,38 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | # From here the update is terminal. Mark closing so a concurrent delta # can't fall through and publish after the terminal once the buffer is - # reaped below. - if isinstance(update, (StreamTaskMessageFull, StreamTaskMessageDone)): + # reaped below. Rolled back if the terminal path fails, so a failed + # terminal doesn't wedge the context into silently dropping later deltas. + is_terminal = isinstance(update, (StreamTaskMessageFull, StreamTaskMessageDone)) + if is_terminal: self._is_closing = True - - # A Full ends the stream and supersedes buffered deltas. Drain and stop - # the buffer BEFORE publishing the Full, so leftover deltas land in order - # (deltas -> Full) instead of trailing the terminal Full as a stale - # duplicate tail. This also stops the ticker, which would otherwise be - # orphaned when __aexit__'s close() short-circuits on _is_closed. - if isinstance(update, StreamTaskMessageFull): - await self._reap_buffer() - - result = await self._streaming_service.stream_update(update) - - if isinstance(update, StreamTaskMessageDone): - await self.close() - return update - elif isinstance(update, StreamTaskMessageFull): - await self._agentex_client.messages.update( - task_id=self.task_id, - message_id=update.parent_task_message.id, # type: ignore[union-attr] - content=update.content.model_dump(), - streaming_status="DONE", - ) - self._is_closed = True - return result + try: + # A Full ends the stream and supersedes buffered deltas. Drain and + # stop the buffer BEFORE publishing the Full, so leftover deltas land + # in order (deltas -> Full) instead of trailing the terminal Full as + # a stale duplicate tail. This also stops the ticker, which would + # otherwise be orphaned when __aexit__'s close() short-circuits. + if isinstance(update, StreamTaskMessageFull): + await self._reap_buffer() + + result = await self._streaming_service.stream_update(update) + + if isinstance(update, StreamTaskMessageDone): + await self.close() + return update + elif isinstance(update, StreamTaskMessageFull): + await self._agentex_client.messages.update( + task_id=self.task_id, + message_id=update.parent_task_message.id, # type: ignore[union-attr] + content=update.content.model_dump(), + streaming_status="DONE", + ) + self._is_closed = True + return result + except Exception: + if not self._is_closed: + self._is_closing = False + raise class StreamingService: diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 36d31343d..7099cc41d 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -621,3 +621,40 @@ async def test_late_delta_during_close_window_is_not_published_after_full(self) await ctx.stream_update(_text(tm, "late")) assert svc.stream_update.await_count == 0, "late delta published after the terminal Full" + + @pytest.mark.asyncio + async def test_close_marks_closing_before_reaping_buffer(self) -> None: + """close() must set _is_closing before reaping, so a concurrent delta in + the post-reap / pre-_is_closed window is dropped, not published after DONE.""" + ctx, _svc, tm = await _make_context("coalesced") + await ctx.stream_update(_text(tm, "hi")) + + closing_at_reap = {} + orig_reap = ctx._reap_buffer + + async def spy() -> None: + closing_at_reap["value"] = ctx._is_closing + await orig_reap() + + ctx._reap_buffer = spy # type: ignore[method-assign] + await ctx.close() + assert closing_at_reap.get("value") is True, "close() must mark _is_closing before reaping" + + @pytest.mark.asyncio + async def test_failed_terminal_rolls_back_is_closing(self) -> None: + """If the terminal persistence fails, _is_closing must roll back so the + still-open context doesn't silently drop later coalesced deltas.""" + ctx, _svc, tm = await _make_context("coalesced") + await ctx.stream_update(_text(tm, "hi")) + ctx._agentex_client.messages.update.side_effect = RuntimeError("persist boom") # type: ignore[attr-defined] + + full = StreamTaskMessageFull( + parent_task_message=tm, + content=TextContent(author="agent", content="hi", format="markdown"), + type="full", + ) + with pytest.raises(RuntimeError): + await ctx.stream_update(full) + + assert ctx._is_closed is False + assert ctx._is_closing is False, "failed terminal left _is_closing stuck on" From 6772a402ea29013658311650d516a957621275f3 Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Fri, 26 Jun 2026 10:09:17 +0200 Subject: [PATCH 5/6] Revert "fix(streaming): mark _is_closing in close() and roll back on terminal failure" This reverts commit 87a6ad06ad23f07625304f46670c070acb8deccd. --- .../lib/core/services/adk/streaming.py | 126 ++++++++---------- tests/lib/core/services/adk/test_streaming.py | 37 ----- 2 files changed, 55 insertions(+), 108 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index d3006572c..1794221ec 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -439,51 +439,41 @@ async def close(self) -> TaskMessage: if not self.task_message: raise ValueError("Context not properly initialized - no task message") - # Mark closing before reaping so a concurrent coalesced delta can't slip - # past the now-None buffer and publish after DONE. Rolled back below if - # the terminal write fails, so a failed close doesn't wedge the context - # into silently dropping later deltas. - self._is_closing = True - try: - # Reap the buffer (stopping its ticker) before the _is_closed - # short-circuit, so a context already marked done by a Full update - # can't leave the ticker orphaned. Draining here also lets consumers - # see the full delta sequence in order before DONE. - await self._reap_buffer() + # Reap the buffer (stopping its ticker) before the _is_closed + # short-circuit, so a context already marked done by a Full update can't + # leave the ticker orphaned. Draining here also lets consumers see the + # full delta sequence in order before DONE. + await self._reap_buffer() - if self._is_closed: - return self.task_message # Already done (buffer reaped above) + if self._is_closed: + return self.task_message # Already done (buffer reaped above) - # Send the DONE event - done_event = StreamTaskMessageDone( - parent_task_message=self.task_message, - type="done", - ) - await self._streaming_service.stream_update(done_event) + # Send the DONE event + done_event = StreamTaskMessageDone( + parent_task_message=self.task_message, + type="done", + ) + await self._streaming_service.stream_update(done_event) - # Update the task message with the final content - has_deltas = ( - self._delta_accumulator._accumulated_deltas - or self._delta_accumulator._reasoning_summaries - or self._delta_accumulator._reasoning_contents - ) - if has_deltas: - self.task_message.content = self._delta_accumulator.convert_to_content() + # Update the task message with the final content + has_deltas = ( + self._delta_accumulator._accumulated_deltas + or self._delta_accumulator._reasoning_summaries + or self._delta_accumulator._reasoning_contents + ) + if has_deltas: + self.task_message.content = self._delta_accumulator.convert_to_content() - await self._agentex_client.messages.update( - task_id=self.task_id, - message_id=self.task_message.id, - content=self.task_message.content.model_dump(), - streaming_status="DONE", - ) + await self._agentex_client.messages.update( + task_id=self.task_id, + message_id=self.task_message.id, + content=self.task_message.content.model_dump(), + streaming_status="DONE", + ) - # Mark the context as done - self._is_closed = True - return self.task_message - except Exception: - if not self._is_closed: - self._is_closing = False - raise + # Mark the context as done + self._is_closed = True + return self.task_message async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | None: """Stream an update to the repository. @@ -522,38 +512,32 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | # From here the update is terminal. Mark closing so a concurrent delta # can't fall through and publish after the terminal once the buffer is - # reaped below. Rolled back if the terminal path fails, so a failed - # terminal doesn't wedge the context into silently dropping later deltas. - is_terminal = isinstance(update, (StreamTaskMessageFull, StreamTaskMessageDone)) - if is_terminal: + # reaped below. + if isinstance(update, (StreamTaskMessageFull, StreamTaskMessageDone)): self._is_closing = True - try: - # A Full ends the stream and supersedes buffered deltas. Drain and - # stop the buffer BEFORE publishing the Full, so leftover deltas land - # in order (deltas -> Full) instead of trailing the terminal Full as - # a stale duplicate tail. This also stops the ticker, which would - # otherwise be orphaned when __aexit__'s close() short-circuits. - if isinstance(update, StreamTaskMessageFull): - await self._reap_buffer() - - result = await self._streaming_service.stream_update(update) - - if isinstance(update, StreamTaskMessageDone): - await self.close() - return update - elif isinstance(update, StreamTaskMessageFull): - await self._agentex_client.messages.update( - task_id=self.task_id, - message_id=update.parent_task_message.id, # type: ignore[union-attr] - content=update.content.model_dump(), - streaming_status="DONE", - ) - self._is_closed = True - return result - except Exception: - if not self._is_closed: - self._is_closing = False - raise + + # A Full ends the stream and supersedes buffered deltas. Drain and stop + # the buffer BEFORE publishing the Full, so leftover deltas land in order + # (deltas -> Full) instead of trailing the terminal Full as a stale + # duplicate tail. This also stops the ticker, which would otherwise be + # orphaned when __aexit__'s close() short-circuits on _is_closed. + if isinstance(update, StreamTaskMessageFull): + await self._reap_buffer() + + result = await self._streaming_service.stream_update(update) + + if isinstance(update, StreamTaskMessageDone): + await self.close() + return update + elif isinstance(update, StreamTaskMessageFull): + await self._agentex_client.messages.update( + task_id=self.task_id, + message_id=update.parent_task_message.id, # type: ignore[union-attr] + content=update.content.model_dump(), + streaming_status="DONE", + ) + self._is_closed = True + return result class StreamingService: diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 7099cc41d..36d31343d 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -621,40 +621,3 @@ async def test_late_delta_during_close_window_is_not_published_after_full(self) await ctx.stream_update(_text(tm, "late")) assert svc.stream_update.await_count == 0, "late delta published after the terminal Full" - - @pytest.mark.asyncio - async def test_close_marks_closing_before_reaping_buffer(self) -> None: - """close() must set _is_closing before reaping, so a concurrent delta in - the post-reap / pre-_is_closed window is dropped, not published after DONE.""" - ctx, _svc, tm = await _make_context("coalesced") - await ctx.stream_update(_text(tm, "hi")) - - closing_at_reap = {} - orig_reap = ctx._reap_buffer - - async def spy() -> None: - closing_at_reap["value"] = ctx._is_closing - await orig_reap() - - ctx._reap_buffer = spy # type: ignore[method-assign] - await ctx.close() - assert closing_at_reap.get("value") is True, "close() must mark _is_closing before reaping" - - @pytest.mark.asyncio - async def test_failed_terminal_rolls_back_is_closing(self) -> None: - """If the terminal persistence fails, _is_closing must roll back so the - still-open context doesn't silently drop later coalesced deltas.""" - ctx, _svc, tm = await _make_context("coalesced") - await ctx.stream_update(_text(tm, "hi")) - ctx._agentex_client.messages.update.side_effect = RuntimeError("persist boom") # type: ignore[attr-defined] - - full = StreamTaskMessageFull( - parent_task_message=tm, - content=TextContent(author="agent", content="hi", format="markdown"), - type="full", - ) - with pytest.raises(RuntimeError): - await ctx.stream_update(full) - - assert ctx._is_closed is False - assert ctx._is_closing is False, "failed terminal left _is_closing stuck on" From 61a507e15e1b9ef22b308fdc1798c88b68dbfb3b Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Fri, 26 Jun 2026 10:32:54 +0200 Subject: [PATCH 6/6] refactor(streaming): drop _is_closing concurrency guard, keep Full-leak fix minimal The _is_closing flag was half-wired (set in stream_update but not close()) and only mattered under concurrent stream_update calls, which the buffer's single-producer design doesn't support. Remove it to keep this PR scoped to the StreamTaskMessageFull ticker-leak fix. Concurrency hardening and Done ordering are tracked in #418. Co-Authored-By: Claude Opus 4.8 --- .../lib/core/services/adk/streaming.py | 15 ------------ tests/lib/core/services/adk/test_streaming.py | 24 ------------------- 2 files changed, 39 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 1794221ec..33ca7bc1c 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -386,9 +386,6 @@ def __init__( self._agentex_client = agentex_client self._streaming_service = streaming_service self._is_closed = False - # Set once a terminal (Full/Done) starts processing, so a concurrent - # delta can't publish after the terminal during the close window. - self._is_closing = False self._delta_accumulator = DeltaAccumulator() self._streaming_mode: StreamingMode = streaming_mode self._buffer: CoalescingBuffer | None = None @@ -402,7 +399,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): async def open(self) -> "StreamingTaskMessageContext": self._is_closed = False - self._is_closing = False self.task_message = await self._agentex_client.messages.create( task_id=self.task_id, @@ -504,17 +500,6 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | if self._streaming_mode == "coalesced" and self._buffer is not None: await self._buffer.add(update) return update - if self._is_closing: - # A terminal (Full/Done) is in flight and the buffer is already - # reaped; drop this late delta instead of publishing it after the - # terminal event. It is still recorded in the accumulator above. - return update - - # From here the update is terminal. Mark closing so a concurrent delta - # can't fall through and publish after the terminal once the buffer is - # reaped below. - if isinstance(update, (StreamTaskMessageFull, StreamTaskMessageDone)): - self._is_closing = True # A Full ends the stream and supersedes buffered deltas. Drain and stop # the buffer BEFORE publishing the Full, so leftover deltas land in order diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 36d31343d..a8068f307 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -597,27 +597,3 @@ async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: assert any(isinstance(u, StreamTaskMessageDelta) for u in published[:-1]), ( "expected the buffered deltas to be published before the Full" ) - - @pytest.mark.asyncio - async def test_late_delta_during_close_window_is_not_published_after_full(self) -> None: - """Once a Full reaps the buffer, a delta racing in before _is_closed is - set (buffer already None) must be dropped, not published after the Full. - Simulates the concurrent window via the _is_closing flag directly.""" - ctx, svc, tm = await _make_context("coalesced") - await ctx.stream_update(_text(tm, "hello")) - - full = StreamTaskMessageFull( - parent_task_message=tm, - content=TextContent(author="agent", content="hello", format="markdown"), - type="full", - ) - await ctx.stream_update(full) - # Full set _is_closed; reopen the close window to mimic a delta arriving - # after the buffer was reaped but before the terminal finished. - ctx._is_closed = False - ctx._is_closing = True - svc.stream_update.reset_mock() - - await ctx.stream_update(_text(tm, "late")) - - assert svc.stream_update.await_count == 0, "late delta published after the terminal Full"