fix(amber): emit Python operator state outputs reliably#4560
Conversation
DataProcessor previously parked at a per-task `finally: _switch_context()` in process_state / process_internal_marker / process_tuple after every task, so MainLoop needed two switches per state: one to drain DataProc out of that finally, and another to drive the next iteration. The state-input read in MainLoop.process_input_state landed between those two switches and consequently saw the previous cycle's output, causing a one-state lag. This change collapses the handshake to one switch: - Drop the per-task `finally: _switch_context()` from process_state, process_internal_marker, and process_tuple. DataProc now always parks at the run-loop's end-of-body switch between tasks. - Drop DataProcessor.run's post-init _switch_context. With the per-task finally switches gone, MainLoop's first switch lands DataProc directly in the while-loop where it consumes whatever input MainLoop has just queued (state, marker, or tuple). - Switch the run-loop input dispatch to peek-then-consume: read the current_internal_marker / current_input_state / current_input_tuple slots without consuming, then consume only the slot whose branch we take. Inputs MainLoop populated but we are not handling this iteration survive into the next iteration. - Read current_output_state after the single switch in process_input_state. The switch returns once DataProc has run the executor and written the output, so the read sees the freshly produced state for this cycle (no more one-state lag). Existing tests in test_main_loop.py pass unchanged.
Add two tests that exercise the single-switch state handshake: - test_process_state_can_emit_multiple_states: stubs out _switch_context to simulate DataProc consuming current_input_state and producing current_output_state on each switch, then drives _process_state twice to verify both states emit through process_input_state. - test_main_loop_thread_can_process_state: full real-thread coverage. After setup, swaps in an in-process executor that tags processed states (process_state) and produces a finish marker on EndChannel (produce_state_on_finish). Sends four states and asserts each emits in order with the executor's tag, confirming the pipeline has no one-state lag. Then sends an EndChannel ECM and asserts the finish-marker state is emitted via _process_end_channel -> process_input_state. Both tests use the State dataclass API (State() + state.add(...)).
This is not true any more. the test has been reenabled in #4547 |
…y removal Adds test_main_loop_thread_can_process_state_after_tuple, which sends a tuple followed by four states and asserts all four states emit in order. The single-switch state handshake assumes DataProcessor parks at the run-loop's end-of-body switch (line 65) between tasks. Without removing process_tuple's per-task `finally: self._switch_context()`, DataProcessor parks at line ~138 instead after a tuple, MainLoop's first switch for the next state cycle wakes it from there (not from the run loop), and the first state after a tuple is dropped because current_output_state is empty when MainLoop reads it. Verified: with process_tuple's finally restored, this test fails with "expected value=1, got value=2" -- the first state's output is silently dropped and the second state's output appears first.
… switch Earlier commits on this branch removed the per-task `finally: self._switch_context()` from process_state, process_internal_marker, and process_tuple, and relied on the run-loop's end-of-body switch (line 65) to park DataProcessor between tasks. Per review on apache#4560, swap to the equivalent design that keeps the per-task finally switches and drops the run-loop's end-of-body switch instead. The diff to data_processor.py is now confined to DataProcessor.run -- the three task-handler bodies are unchanged from origin/main: - process_state, process_internal_marker, process_tuple keep their original `finally: self._switch_context()`. DataProcessor parks at whichever per-task finally just ran, between tasks. - DataProcessor.run no longer does a post-init _switch_context (would burn MainLoop's first switch on a no-op handshake) and no longer does the end-of-body _switch_context after each task (the per-task finally already plays that role). The body is just dispatch + peek-then-consume. Both designs are functionally equivalent; this one minimizes the source diff and addresses the review concern about touching the tuple code path. All 8 tests in test_main_loop.py pass.
… switch Mirrors the same change on state-handshake-redesign (the upstream PR candidate, #4560 review feedback). Functionally equivalent to the prior design that removed all three per-task finallys; this design keeps them and drops the run-loop's end-of-body _switch_context() instead, which keeps process_state / process_internal_marker / process_tuple unchanged from origin/main.
updated. |
The earlier short-line `assert end_channel_state["finish_marker"] == ...` fits on a single line under ruff's 88-column limit and does not need the parenthesized line-break form that was used. CI's `ruff format --check .` on the python matrix flagged this as the only formatting nit; collapse the assertion onto one line so `ruff format --check` is clean. No behavior change; all 8 tests still pass.
DataProcessor.run: - Replace the post-init `_switch_context()` with a direct `_post_switch_context_checks()` call. This preserves the previous guarantee that any debug command queued during worker setup fires before the first task is processed, but does not consume a round trip on MainLoop's first switch (which would silently drop the first state/tuple/marker, see apache#4559). - Add an explicit `queued != 1` runtime check at the top of each iteration. MainLoop is single-threaded and sets exactly one of current_internal_marker / current_input_state / current_input_tuple before switching, so the loop body should never see zero or more than one. The previous `else: raise RuntimeError("No marker or tuple to process.")` only guarded the zero case and didn't mention state. test_main_loop.py: - Move the `_StateProcessingExecutor` test helper to module scope and expose it via a `state_processing_executor` fixture, instead of redefining the class inline in each test. The two tests previously carried near-identical copies of it. - Drop the timeouts on `test_main_loop_thread_can_process_state` and `test_main_loop_thread_can_process_state_after_tuple` from 5s to 2s -- both run in the millisecond range; 5s was just slack.
|
@Yicong-Huang, please review again. |
Move the test helper class from module scope into the `state_processing_executor` fixture body, per review on apache#4560.
Per review on apache#4560: `_post_switch_context_checks` is named for the post-switch case; calling it standalone before the first task is misleading. Add a dedicated `_pre_loop_checks()` that runs once after init, only for the debug-command check (no task has run yet, so no exception to surface).
|
We will merge after the release. |
|
can we merge now? |
It was merged once the back port feature introduced. |
|
sorry I had cached state on my end and mistakenly saw this PR was still open, which is not any more. |
What changes were proposed in this PR?
Restores reliable state-output emission for Python operators after the #4552 revert. After this PR, both per-input-state outputs (
Operator.process_state(...)) and the end-of-input-port output (Operator.produce_state_on_finish(...)) reach downstream channels.MainLoop.process_input_statepreviously did two_switch_context()calls with the read ofcurrent_output_statein between. The executor only writes that field during the second switch — soMainLoopalways captured the previous cycle's value, and the finish-state set onEndChannelended up incurrent_output_stateafterMainLoophad returned, never to be read again. This PR collapses the read to a single switch + read-after, drops the duplicate post-init and end-of-body switches inDataProcessor.run, and makes the run-loop's input dispatch peek-then-consume socurrent_internal_markerkeeps the atomic single-consume semantics whose absence was the root cause of #4545.History — third attempt at this fix
_switch_context()calls to keepMainLoopandDataProcessorin sync, closed Python operator unable to process more than one State #4421, but changedcurrent_internal_markerlifetime and broke the source-propagation case inReconfigurationSpec(ReconfigurationSpec: source-propagation case hangs after UDF processes EndChannel #4545).ReconfigurationSpec. CI continued to fail.Any related issues, documentation, discussions?
Fixes #4559. Follow-up to #4421 / #4424 / #4545 / #4547 / #4552.
How was this PR tested?
Existing
core/runnables/test_main_loop.pytests pass unchanged. Added three new tests:test_process_state_can_emit_multiple_states— stub-level coverage of the Python operator unable to process more than one State #4421 "second state not processed" scenario.test_main_loop_thread_can_process_state— full real-thread coverage of state DataElements andproduce_state_on_finishonEndChannel. Times out on plainmain(Python operator silently drops state outputs #4559); passes on this branch.test_main_loop_thread_can_process_state_after_tuple— coverage for the mixedtuple → stateinput sequence.ReconfigurationSpec's source-propagation case (re-enabled in #4547) should be re-run on this branch to confirm the new handshake does not re-introduce #4545.Was this PR authored or co-authored using generative AI tooling?
Generated-by: Anthropic Claude Opus 4.7