Skip to content

fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426

Open
eberki-scale wants to merge 2 commits into
nextfrom
endre/full-closes-buffer
Open

fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426
eberki-scale wants to merge 2 commits into
nextfrom
endre/full-closes-buffer

Conversation

@eberki-scale

@eberki-scale eberki-scale commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

A StreamTaskMessageFull ends the stream and marks the StreamingTaskMessageContext done, but it did not close the coalescing buffer. When the context later exits, __aexit__ → close() hits its if self._is_closed: return guard and never stops the buffer's background ticker — leaving an orphaned task per such stream. Separately, any deltas still buffered when the Full arrives could be published after the terminal Full, which a consumer treating Full as the final message reads as a stale duplicate tail.

Fix

Two small changes to StreamingTaskMessageContext in agentex/lib/core/services/adk/streaming.py:

  1. stream_update — when a StreamTaskMessageFull arrives, drain and close the buffer before publishing the Full, so leftover deltas land in order (deltas → Full) and the ticker is stopped.
  2. close() — reap the buffer before the _is_closed short-circuit, so a context already marked done (by a Full on another path) can never leave the ticker orphaned.

No change to CoalescingBuffer itself.

Tests

New TestFullMessageClosesBuffer:

  • test_full_message_stops_ticker — after a Full, the buffer is reaped and its ticker task is done() (not orphaned).
  • test_full_is_terminal_publish_no_trailing_deltas — buffered deltas publish before the Full; the Full is the terminal publish.

Full streaming suite: 33 passed (tests/lib/core/services/adk/test_streaming.py).

Scope

Deliberately narrow — just the StreamTaskMessageFull orphan + ordering fix, split out from the broader buffer work in #418 for easier review.

Greptile Summary

  • Updates StreamingTaskMessageContext so terminal StreamTaskMessageFull messages drain and close the coalescing buffer before publishing the full message.
  • Ensures close() reaps the coalescing buffer even when the context was already marked closed.
  • Adds streaming tests covering ticker cleanup, terminal publish ordering, and add/close race handling.

Confidence Score: 5/5

The streaming lifecycle changes are narrow, directly covered by targeted tests, and do not introduce broader API or persistence risk.

The changes address buffer draining, ticker cleanup, and terminal publish ordering in the affected context, with focused tests exercising the intended edge cases.

T-Rex T-Rex Logs

What T-Rex did

  • Before the artifact for proof 0, the run shows base published start, delta:first-, full:FINAL, followed by a stale trailing delta:buffered-tail.
  • After the artifact for proof 0, head published start, delta:first-, delta:buffered-tail, then full:FINAL; buffer_after_full_is_none was true, ticker_done_after_full was true, and no coalescing-buffer tasks remained alive after close.
  • Before the artifact for proof 1, trex-artifacts/coalescing-race-guard-01-before.log shows the commit ending with buf_len=1, buf_contents=['racing'], flushed=0, RESULT=STRANDED at the race observation point.
  • After the artifact for proof 1, trex-artifacts/coalescing-race-guard-02-after.log shows the commit ending with buf_len=0, buf_contents=[], flushed=0, RESULT=DROPPED at the same race observation point.

View all artifacts

T-Rex Ran code and verified through T-Rex

Reviews (3): Last reviewed commit: "fix(streaming): re-check _closed under l..." | Re-trigger Greptile

@stainless-app stainless-app Bot force-pushed the next branch 2 times, most recently from 1d86e8a to b30a90b Compare June 23, 2026 22:17
A StreamTaskMessageFull ends the stream and marks the context done, but it did
not close the coalescing buffer. __aexit__'s close() then short-circuits on
_is_closed and never stops the buffer's ticker, leaving an orphaned background
task. Buffered deltas could also publish after the terminal Full, which a
consumer treating Full as final reads as a stale duplicate tail.

Drain and stop the buffer before publishing the Full (deltas -> Full ordering),
and reap the buffer in close() before the _is_closed short-circuit so it can't
be orphaned on any path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@eberki-scale eberki-scale force-pushed the endre/full-closes-buffer branch from d15800e to a75aa68 Compare June 25, 2026 14:30
Comment thread src/agentex/lib/core/services/adk/streaming.py
…ng delta

add() checked _closed before taking the lock, so a delta racing a Full-driven
buffer close() could pass the check, then append after the buffer was drained
and its ticker shut down — stranding the delta, never published. Re-check
_closed under the lock before appending.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
# (deltas -> Full) instead of trailing the terminal Full as a stale
# duplicate tail. This also stops the ticker, which would otherwise be
# orphaned when __aexit__'s close() short-circuits on _is_closed.
if isinstance(update, StreamTaskMessageFull) and self._buffer is not None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that Greptile also commented about this and looks like it resolved its own comment but it looks to me like it's still possible that we could have a race here? e.g.

  1. This code block closes the buffer and sets it to none
  2. stream_update (or messages.update) below is being awaited
  3. another delta comes in and skips this block since buffer is already none but _is_closed is not yet set to True

In this case, consumers will still see a delta after full since it just falls through to publishing directly. If this is something we care about (and my logic checks out and you also believe there's a race), we could have a terminal-in-progress flag like _is_closing set before awaiting buffer close to reject deltas?

result = await self._streaming_service.stream_update(update)

if isinstance(update, StreamTaskMessageDone):
await self.close()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i'm reading this correctly, this code block still has a latent bug similar to the one we fixed for the Full case above, right? curious about just abstracting the code above into something like _reap_buffer and calling that here as well for readability and more defensiveness going forward.

# leave the ticker orphaned. Draining here also lets consumers see the
# full delta sequence in order before DONE.
if self._buffer is not None:
await self._buffer.close()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think it would be ever so slightly cleaner to just abstract this into _reap_buffer even if we're only calling it into two places and then call it here and below

# Buffered deltas must publish BEFORE the Full, never after (a trailing
# delta after the terminal Full reads as a stale duplicate tail).
ctx, svc, tm = await _make_context("coalesced")
# "alpha" flushes immediately; "beta" stays buffered in the window.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically, "beta" may or may not stay buffered in the window since the assertions only need the Full to be the terminal publish and 1 or more delta before it, right? if so, i think this test is still a solid one as written but maybe just worth updating the comment to be more precise (or adding a clarification to the assertions below)

@smoreinis smoreinis left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this fix - overall the PR is solid and I'm fine with it going in as-is.
just a couple of minor nits in the comments that I think could be nice to haves, but happy to leave these at the author discretion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants