fix(amber): Python internal marker replay during reconfiguration#4547
Merged
Yicong-Huang merged 2 commits intoApr 28, 2026
Conversation
c7da7a5 to
05ba9f4
Compare
aglinxinyuan
approved these changes
Apr 28, 2026
Contributor
aglinxinyuan
left a comment
There was a problem hiding this comment.
Do you think the comment in #4424 (comment) is valid?
Contributor
Author
I replied there. I think the comment is not valid, and the true issue in $4424 is not about |
This was referenced Apr 28, 2026
aglinxinyuan
added a commit
that referenced
this pull request
Apr 30, 2026
### What changes were proposed in this PR? Restores reliable state-output emission for Python operators after the #4552 revert. After this PR, both per-input-state outputs (`Operator.process_state(...)`) and the end-of-input-port output (`Operator.produce_state_on_finish(...)`) reach downstream channels. `MainLoop.process_input_state` previously did two `_switch_context()` calls with the read of `current_output_state` in between. The executor only writes that field during the *second* switch — so `MainLoop` always captured the previous cycle's value, and the finish-state set on `EndChannel` ended up in `current_output_state` after `MainLoop` had returned, never to be read again. This PR collapses the read to a single switch + read-after, drops the duplicate post-init and end-of-body switches in `DataProcessor.run`, and makes the run-loop's input dispatch peek-then-consume so `current_internal_marker` keeps the atomic single-consume semantics whose absence was the root cause of #4545. <details> <summary>History — third attempt at this fix</summary> - #4421 reported that a Python operator could process its first state input but not its second. - PR #4424 added three `_switch_context()` calls to keep `MainLoop` and `DataProcessor` in sync, closed #4421, but changed `current_internal_marker` lifetime and broke the source-propagation case in `ReconfigurationSpec` (#4545). - PR #4547 tried to restore atomic marker consumption on top of #4424 and re-enabled the source-propagation case in `ReconfigurationSpec`. CI continued to fail. - PR #4552 reverted #4424 outright as a stop-gap. State-processing is back to its pre-#4424 broken state — see #4559. </details> ### Any related issues, documentation, discussions? Fixes #4559. Follow-up to #4421 / #4424 / #4545 / #4547 / #4552. ### How was this PR tested? Existing `core/runnables/test_main_loop.py` tests pass unchanged. Added three new tests: - `test_process_state_can_emit_multiple_states` — stub-level coverage of the #4421 "second state not processed" scenario. - `test_main_loop_thread_can_process_state` — full real-thread coverage of state DataElements and `produce_state_on_finish` on `EndChannel`. Times out on plain `main` (#4559); passes on this branch. - `test_main_loop_thread_can_process_state_after_tuple` — coverage for the mixed `tuple → state` input sequence. `ReconfigurationSpec`'s source-propagation case (re-enabled in #4547) should be re-run on this branch to confirm the new handshake does not re-introduce #4545. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Anthropic Claude Opus 4.7
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this PR?
This PR fixes the Python reconfiguration hang reported in #4545 and explicitly re-enables the regression test that was temporarily ignored in #4546.
The regression came from
#4424(ef66190f22), which changed the lifetime of Pythoncurrent_internal_marker. After that change,get_internal_marker()stopped consuming the marker on read, and the main loop deferred cleanup until after replaying internal channel markers.For Python source operators, that allowed an internal
EndChannelmarker to remain visible across the pause and reconfiguration window. When the reconfiguration ECM was processed, the stale marker could be observed and replayed again, which corrupted end-of-stream handling and caused the workflow to hang.This PR restores the expected one-time consumption behavior by:
get_internal_marker()consumecurrent_internal_markerwhen it is readshould propagate reconfiguration through a source operator in workflowinReconfigurationSpecfromignoreback to a normal enabled testAny related issues, documentation, discussions?
Fixes #4545.
Regression introduced by #4424.
Re-enables the temporary test disable from #4546 after fixing the underlying lifecycle bug.
How was this PR tested?
Tested with existing Scala tests using Java 11:
WorkflowExecutionService/testOnly org.apache.texera.amber.engine.e2e.ReconfigurationSpecThis run included the re-enabled
should propagate reconfiguration through a source operator in workflowcase. The fullReconfigurationSpecpassed on the rebased branch (5/5passed,0ignored).Was this PR authored or co-authored using generative AI tooling?
Generated-by: OpenAI Codex (GPT-5)