fix: use the closed window in the eof response#360
Open
vaibhavtiwari33 wants to merge 2 commits into
Open
Conversation
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #360 +/- ##
==========================================
- Coverage 92.74% 92.68% -0.06%
==========================================
Files 67 67
Lines 3514 3540 +26
Branches 228 232 +4
==========================================
+ Hits 3259 3281 +22
- Misses 190 192 +2
- Partials 65 67 +2 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
BulkBeing
requested changes
Jun 18, 2026
| generator_response = accumulator_stub.AccumulateFn( | ||
| request_iterator=request_generator(count=5, request=request, send_close=False) | ||
| ) | ||
| except grpc.RpcError as e: |
Contributor
There was a problem hiding this comment.
Don't ignore these errors. If there's an error test should fail right?
| _result_queue: NonBlockingIterator | ||
| _consumer_future: Task | ||
| _latest_watermark: datetime | ||
| _close_window: KeyedWindow | None |
Contributor
There was a problem hiding this comment.
I think you can do like this:
_close_window: KeyedWindow | None = field(default=None, init=False)This value is not known/set at initialization of AccumulatorResult right? With above, it's not part of the constructor.
BulkBeing
reviewed
Jun 18, 2026
| """ | ||
| return self._close_window | ||
|
|
||
| def set_close_window(self, window: KeyedWindow): |
Contributor
There was a problem hiding this comment.
close_window is already a read-only property. Let's make this as well property setter ?
@close_window.setter
def close_window(self, window: KeyedWindow):
if not isinstance(window, KeyedWindow):
raise TypeError("window must be a KeyedWindow object")
self._close_window = window
BulkBeing
requested changes
Jun 18, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Based off of numaproj/numaflow-rs#177
Testing
Updated the stream sorter example's handler to the following:
Created a jitter source, which pauses emission of events with certain keys for 45 seconds. Total 5 keys in the system. Timeout set for the accumulator is 30s
Logged into stream sorter pod to track disk usage across 4 hours (I should've probably installed prometheus and metrics server to gather this detail in my local cluster but I was too lazy to do that for some reason) :
Disk usage
Memory usage
Overall values seem mostly consistent across time.