fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426
fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426eberki-scale wants to merge 2 commits into
Conversation
1d86e8a to
b30a90b
Compare
A StreamTaskMessageFull ends the stream and marks the context done, but it did not close the coalescing buffer. __aexit__'s close() then short-circuits on _is_closed and never stops the buffer's ticker, leaving an orphaned background task. Buffered deltas could also publish after the terminal Full, which a consumer treating Full as final reads as a stale duplicate tail. Drain and stop the buffer before publishing the Full (deltas -> Full ordering), and reap the buffer in close() before the _is_closed short-circuit so it can't be orphaned on any path. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
d15800e to
a75aa68
Compare
…ng delta add() checked _closed before taking the lock, so a delta racing a Full-driven buffer close() could pass the check, then append after the buffer was drained and its ticker shut down — stranding the delta, never published. Re-check _closed under the lock before appending. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
| # (deltas -> Full) instead of trailing the terminal Full as a stale | ||
| # duplicate tail. This also stops the ticker, which would otherwise be | ||
| # orphaned when __aexit__'s close() short-circuits on _is_closed. | ||
| if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: |
There was a problem hiding this comment.
I saw that Greptile also commented about this and looks like it resolved its own comment but it looks to me like it's still possible that we could have a race here? e.g.
- This code block closes the buffer and sets it to none
- stream_update (or messages.update) below is being awaited
- another delta comes in and skips this block since buffer is already none but _is_closed is not yet set to True
In this case, consumers will still see a delta after full since it just falls through to publishing directly. If this is something we care about (and my logic checks out and you also believe there's a race), we could have a terminal-in-progress flag like _is_closing set before awaiting buffer close to reject deltas?
| result = await self._streaming_service.stream_update(update) | ||
|
|
||
| if isinstance(update, StreamTaskMessageDone): | ||
| await self.close() |
There was a problem hiding this comment.
if i'm reading this correctly, this code block still has a latent bug similar to the one we fixed for the Full case above, right? curious about just abstracting the code above into something like _reap_buffer and calling that here as well for readability and more defensiveness going forward.
| # leave the ticker orphaned. Draining here also lets consumers see the | ||
| # full delta sequence in order before DONE. | ||
| if self._buffer is not None: | ||
| await self._buffer.close() |
There was a problem hiding this comment.
think it would be ever so slightly cleaner to just abstract this into _reap_buffer even if we're only calling it into two places and then call it here and below
| # Buffered deltas must publish BEFORE the Full, never after (a trailing | ||
| # delta after the terminal Full reads as a stale duplicate tail). | ||
| ctx, svc, tm = await _make_context("coalesced") | ||
| # "alpha" flushes immediately; "beta" stays buffered in the window. |
There was a problem hiding this comment.
technically, "beta" may or may not stay buffered in the window since the assertions only need the Full to be the terminal publish and 1 or more delta before it, right? if so, i think this test is still a solid one as written but maybe just worth updating the comment to be more precise (or adding a clarification to the assertions below)
smoreinis
left a comment
There was a problem hiding this comment.
thanks for this fix - overall the PR is solid and I'm fine with it going in as-is.
just a couple of minor nits in the comments that I think could be nice to haves, but happy to leave these at the author discretion.
Summary
A
StreamTaskMessageFullends the stream and marks theStreamingTaskMessageContextdone, but it did not close the coalescing buffer. When the context later exits,__aexit__ → close()hits itsif self._is_closed: returnguard and never stops the buffer's background ticker — leaving an orphaned task per such stream. Separately, any deltas still buffered when theFullarrives could be published after the terminalFull, which a consumer treatingFullas the final message reads as a stale duplicate tail.Fix
Two small changes to
StreamingTaskMessageContextinagentex/lib/core/services/adk/streaming.py:stream_update— when aStreamTaskMessageFullarrives, drain and close the buffer before publishing theFull, so leftover deltas land in order (deltas → Full) and the ticker is stopped.close()— reap the buffer before the_is_closedshort-circuit, so a context already marked done (by aFullon another path) can never leave the ticker orphaned.No change to
CoalescingBufferitself.Tests
New
TestFullMessageClosesBuffer:test_full_message_stops_ticker— after aFull, the buffer is reaped and its ticker task isdone()(not orphaned).test_full_is_terminal_publish_no_trailing_deltas— buffered deltas publish before theFull; theFullis the terminal publish.Full streaming suite: 33 passed (
tests/lib/core/services/adk/test_streaming.py).Scope
Deliberately narrow — just the
StreamTaskMessageFullorphan + ordering fix, split out from the broader buffer work in #418 for easier review.Greptile Summary
StreamingTaskMessageContextso terminalStreamTaskMessageFullmessages drain and close the coalescing buffer before publishing the full message.close()reaps the coalescing buffer even when the context was already marked closed.Confidence Score: 5/5
The streaming lifecycle changes are narrow, directly covered by targeted tests, and do not introduce broader API or persistence risk.
The changes address buffer draining, ticker cleanup, and terminal publish ordering in the affected context, with focused tests exercising the intended edge cases.
What T-Rex did
Reviews (3): Last reviewed commit: "fix(streaming): re-check _closed under l..." | Re-trigger Greptile