-
Notifications
You must be signed in to change notification settings - Fork 26
fix: use the closed window in the eof response #360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -234,6 +234,7 @@ class AccumulatorResult: | |
| "_result_queue", | ||
| "_consumer_future", | ||
| "_latest_watermark", | ||
| "_close_window", | ||
| ) | ||
|
|
||
| _future: Task | ||
|
|
@@ -242,6 +243,7 @@ class AccumulatorResult: | |
| _result_queue: NonBlockingIterator | ||
| _consumer_future: Task | ||
| _latest_watermark: datetime | ||
| _close_window: KeyedWindow | None | ||
|
|
||
| @property | ||
| def future(self) -> Task: | ||
|
|
@@ -310,6 +312,29 @@ def update_watermark(self, new_watermark: datetime): | |
| raise TypeError("new_watermark must be a datetime object") | ||
| self._latest_watermark = new_watermark | ||
|
|
||
| @property | ||
| def close_window(self) -> KeyedWindow | None: | ||
| """Returns the keyed window from the CLOSE request, if the task was closed. | ||
|
|
||
| Returns: | ||
| KeyedWindow | None: The window carried by the CLOSE request, echoed back in | ||
| the EOF response; None if the task has not received a CLOSE. | ||
| """ | ||
| return self._close_window | ||
|
|
||
| def set_close_window(self, window: KeyedWindow): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@close_window.setter
def close_window(self, window: KeyedWindow):
if not isinstance(window, KeyedWindow):
raise TypeError("window must be a KeyedWindow object")
self._close_window = window |
||
| """Stashes the CLOSE request's keyed window so the EOF response can echo it. | ||
|
|
||
| Args: | ||
| window (KeyedWindow): The keyed window from the CLOSE request. | ||
|
|
||
| Raises: | ||
| TypeError: If window is not a KeyedWindow object. | ||
| """ | ||
| if not isinstance(window, KeyedWindow): | ||
| raise TypeError("window must be a KeyedWindow object") | ||
| self._close_window = window | ||
|
|
||
|
|
||
| @dataclass | ||
| class AccumulatorRequest: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| import grpc | ||
| import pytest | ||
| from google.protobuf import empty_pb2 as _empty_pb2 | ||
| from google.protobuf import timestamp_pb2 | ||
|
|
||
| from pynumaflow import setup_logging | ||
| from pynumaflow.accumulator import ( | ||
|
|
@@ -87,6 +88,69 @@ def request_generator_mixed(count, request, resetkey: bool = False): | |
| yield request | ||
|
|
||
|
|
||
| # Distinct, recognizable close-window values used to prove the EOF echoes the | ||
| # CLOSE request's window rather than the synthesized fallback window. | ||
| CLOSE_WINDOW_START_SECONDS = 1000000000 | ||
| CLOSE_WINDOW_END_SECONDS = 2000000000 | ||
| CLOSE_WINDOW_SLOT = "slot-7" | ||
|
|
||
| # The accumulator's global window carries an "infinite" end (chrono MAX_UTC, ~year 262137) | ||
| # whose seconds exceed Python datetime's range. Core sends this on OPEN/APPEND. | ||
| GLOBAL_WINDOW_END_SECONDS = 8210266876799 | ||
|
|
||
|
|
||
| def request_generator_custom_close(count, request): | ||
| """Yields OPEN + APPEND requests, then a CLOSE whose keyed window carries | ||
| distinct start/end/slot values (mirroring core sending a real | ||
| max_event_time + timeout window on close).""" | ||
| for i in range(count): | ||
| if i == 0: | ||
| request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN | ||
| else: | ||
| request.operation.event = ( | ||
| accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND | ||
| ) | ||
| yield request | ||
|
|
||
| # CLOSE carrying a distinct keyed window to be echoed back in the EOF response. | ||
| request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE | ||
| request.operation.keyedWindow.start.CopyFrom( | ||
| timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS) | ||
| ) | ||
| request.operation.keyedWindow.end.CopyFrom( | ||
| timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS) | ||
| ) | ||
| request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT | ||
| yield request | ||
|
|
||
|
|
||
| def request_generator_infinite_then_close(count, request): | ||
| """OPEN + APPEND requests whose window end is the global 'infinite' sentinel | ||
| (chrono MAX_UTC, out of Python datetime range), then a CLOSE carrying a concrete, | ||
| representable window. Mirrors what real core sends to an accumulator.""" | ||
| request.operation.keyedWindow.end.CopyFrom( | ||
| timestamp_pb2.Timestamp(seconds=GLOBAL_WINDOW_END_SECONDS) | ||
| ) | ||
| for i in range(count): | ||
| if i == 0: | ||
| request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN | ||
| else: | ||
| request.operation.event = ( | ||
| accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND | ||
| ) | ||
| yield request | ||
|
|
||
| request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE | ||
| request.operation.keyedWindow.start.CopyFrom( | ||
| timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS) | ||
| ) | ||
| request.operation.keyedWindow.end.CopyFrom( | ||
| timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS) | ||
| ) | ||
| request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT | ||
| yield request | ||
|
|
||
|
|
||
| def start_request() -> accumulator_pb2.AccumulatorRequest: | ||
| event_time_timestamp, watermark_timestamp = get_time_args() | ||
| window = accumulator_pb2.KeyedWindow( | ||
|
|
@@ -289,6 +353,80 @@ def test_accumulate_with_close(accumulator_stub) -> None: | |
| assert 1 == eof_count | ||
|
|
||
|
|
||
| def test_accumulate_close_echoes_eof_window(accumulator_stub) -> None: | ||
| """The EOF response must echo the exact KeyedWindow from the CLOSE request.""" | ||
| request = start_request() | ||
| generator_response = None | ||
| try: | ||
| generator_response = accumulator_stub.AccumulateFn( | ||
| request_iterator=request_generator_custom_close(count=5, request=request) | ||
| ) | ||
| except grpc.RpcError as e: | ||
| logging.error(e) | ||
|
|
||
| eof_count = 0 | ||
| for r in generator_response: | ||
| if r.EOF: | ||
| eof_count += 1 | ||
| assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS | ||
| assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS | ||
| assert r.window.slot == CLOSE_WINDOW_SLOT | ||
| assert list(r.window.keys) == ["test_key"] | ||
|
|
||
| assert 1 == eof_count | ||
|
|
||
|
|
||
| def test_accumulate_infinite_window_end_does_not_crash(accumulator_stub) -> None: | ||
| """A global window with an 'infinite' end (out of Python datetime range) on OPEN/APPEND | ||
| must not crash decoding; the stream completes and the EOF echoes the CLOSE window.""" | ||
| request = start_request() | ||
| generator_response = None | ||
| try: | ||
| generator_response = accumulator_stub.AccumulateFn( | ||
| request_iterator=request_generator_infinite_then_close(count=5, request=request) | ||
| ) | ||
| except grpc.RpcError as e: | ||
| logging.error(e) | ||
|
|
||
| count = 0 | ||
| eof_count = 0 | ||
| for r in generator_response: | ||
| if r.EOF: | ||
| eof_count += 1 | ||
| assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS | ||
| assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS | ||
| assert r.window.slot == CLOSE_WINDOW_SLOT | ||
| elif r.payload.value: | ||
| count += 1 | ||
|
|
||
| # All 5 datums were processed and exactly one EOF was emitted (no crash). | ||
| assert 5 == count | ||
| assert 1 == eof_count | ||
|
|
||
|
|
||
| def test_accumulate_eof_window_fallback_without_close(accumulator_stub) -> None: | ||
| """When the stream closes without a CLOSE (e.g. shutdown), the EOF window falls | ||
| back to the synthesized window (start=epoch(0), slot='slot-0').""" | ||
| request = start_request() | ||
| generator_response = None | ||
| try: | ||
| generator_response = accumulator_stub.AccumulateFn( | ||
| request_iterator=request_generator(count=5, request=request, send_close=False) | ||
| ) | ||
| except grpc.RpcError as e: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't ignore these errors. If there's an error test should fail right? |
||
| logging.error(e) | ||
|
|
||
| eof_count = 0 | ||
| for r in generator_response: | ||
| if r.EOF: | ||
| eof_count += 1 | ||
| assert r.window.start.seconds == 0 | ||
| assert r.window.slot == "slot-0" | ||
| assert list(r.window.keys) == ["test_key"] | ||
|
|
||
| assert 1 == eof_count | ||
|
|
||
|
|
||
| def test_accumulate_append_without_open(accumulator_stub) -> None: | ||
| request = start_request_without_open() | ||
| generator_response = None | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do like this:
This value is not known/set at initialization of
AccumulatorResultright? With above, it's not part of the constructor.