fix: add missing context switches for repeated state processing#4424
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes repeated Python State processing by realigning the MainLoop ↔ DataProcessor context-switch handshake so multiple states can be processed sequentially (closes #4421).
Changes:
- Add a pre-processing
_switch_context()inMainLoop._process_stateto keep state handoff synchronized across repeated inputs. - Add a
_switch_context()beforeexecutor.process_state(...)inDataProcessor.process_stateto align with the updated handshake. - Add a
_switch_context()after ECM pause resume to keep internal-marker handling in sync. - Add a regression test asserting multiple state outputs can be emitted in sequence.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
amber/src/main/python/core/runnables/main_loop.py |
Adds missing context switches in state processing and ECM handling to keep threads synchronized. |
amber/src/main/python/core/runnables/data_processor.py |
Adds a context switch before invoking executor.process_state to match the updated handshake. |
amber/src/main/python/core/runnables/test_main_loop.py |
Adds a regression test covering sequential state processing and emission. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
There might be a potential bug.
| if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT: | ||
| self.context.pause_manager.resume(PauseType.ECM_PAUSE) | ||
|
|
||
| self._switch_context() |
There was a problem hiding this comment.
I think this _switch_context() can break ECM handling because it gives DataProcessor a chance to consume current_internal_marker before MainLoop checks it below.
For StartChannel / EndChannel, the control handler sets tuple_processing_manager.current_internal_marker. But DataProcessor.run() reads it via get_internal_marker(), which also clears the field. So with this new switch, the sequence can become:
- MainLoop handles the ECM command and sets
current_internal_marker. - MainLoop switches to DataProcessor.
- DataProcessor calls
get_internal_marker()and clears the marker. - MainLoop resumes and sees
current_internal_marker is None. - MainLoop skips
_process_start_channel()/_process_end_channel().
That is risky because those MainLoop methods do more than just invoke the executor hook: _process_end_channel() also emits finish state/tuples, sends port completion messages, forwards downstream ECMs, and completes the worker. DataProcessor consuming the marker first can therefore leave the lifecycle handling incomplete or stuck waiting for a matching MainLoop-side switch.
It may be safer to avoid switching here before MainLoop has inspected/handled current_internal_marker, or to restructure the marker handoff so only one side is responsible for consuming it.
There was a problem hiding this comment.
I kept the existing structure and made the marker handoff non-consuming instead.
DataProcessor no longer clears current_internal_marker when it reads it, and MainLoop now clears the marker only after _process_start_channel() / _process_end_channel() finishes.
This keeps the ECM-side _switch_context() for the repeated-state fix, while avoiding the race you pointed out where DataProcessor could consume the marker before MainLoop handled the lifecycle logic.
There was a problem hiding this comment.
- DataProcessor calls get_internal_marker() and clears the marker.
- MainLoop resumes and sees current_internal_marker is None.
These steps are intentional. All those states/message should only be processed by one component (e.g., DP or CP), and each state/message can only be consumed or processed once.
If you need to trigger _process_start_channel() / _process_end_channel() in the main loop, it requires another message.
There was a problem hiding this comment.
ideally, besides waiting on debugger, where the thread has to wait on CP to receive the next message, at all places use switch context should have no harm besides wasted CPU. If you have message that could be processed by CP or DP, and using a wrong switch_context would cause the massage be handled by the wrong process, then I would blame the design of the message: The message should only be processed by one of CP or DP, not both.
| ret, self.current_internal_marker = self.current_internal_marker, None | ||
| return ret |
There was a problem hiding this comment.
why do we change this?
There was a problem hiding this comment.
To address the comment. You can check the comment(there is only one).
### What changes were proposed in this PR? This PR fixes the Python reconfiguration hang reported in #4545 and explicitly re-enables the regression test that was temporarily ignored in #4546. The regression came from `#4424` (`ef66190f22`), which changed the lifetime of Python `current_internal_marker`. After that change, `get_internal_marker()` stopped consuming the marker on read, and the main loop deferred cleanup until after replaying internal channel markers. For Python source operators, that allowed an internal `EndChannel` marker to remain visible across the pause and reconfiguration window. When the reconfiguration ECM was processed, the stale marker could be observed and replayed again, which corrupted end-of-stream handling and caused the workflow to hang. This PR restores the expected one-time consumption behavior by: - making `get_internal_marker()` consume `current_internal_marker` when it is read - removing the extra delayed cleanup after replaying internal channel markers in the Python main loop - changing `should propagate reconfiguration through a source operator in workflow` in `ReconfigurationSpec` from `ignore` back to a normal enabled test ### Any related issues, documentation, discussions? Fixes #4545. Regression introduced by #4424. Re-enables the temporary test disable from #4546 after fixing the underlying lifecycle bug. ### How was this PR tested? Tested with existing Scala tests using Java 11: - `WorkflowExecutionService/testOnly org.apache.texera.amber.engine.e2e.ReconfigurationSpec` This run included the re-enabled `should propagate reconfiguration through a source operator in workflow` case. The full `ReconfigurationSpec` passed on the rebased branch (`5/5` passed, `0` ignored). ### Was this PR authored or co-authored using generative AI tooling? Generated-by: OpenAI Codex (GPT-5)
### What changes were proposed in this PR? Restores reliable state-output emission for Python operators after the #4552 revert. After this PR, both per-input-state outputs (`Operator.process_state(...)`) and the end-of-input-port output (`Operator.produce_state_on_finish(...)`) reach downstream channels. `MainLoop.process_input_state` previously did two `_switch_context()` calls with the read of `current_output_state` in between. The executor only writes that field during the *second* switch — so `MainLoop` always captured the previous cycle's value, and the finish-state set on `EndChannel` ended up in `current_output_state` after `MainLoop` had returned, never to be read again. This PR collapses the read to a single switch + read-after, drops the duplicate post-init and end-of-body switches in `DataProcessor.run`, and makes the run-loop's input dispatch peek-then-consume so `current_internal_marker` keeps the atomic single-consume semantics whose absence was the root cause of #4545. <details> <summary>History — third attempt at this fix</summary> - #4421 reported that a Python operator could process its first state input but not its second. - PR #4424 added three `_switch_context()` calls to keep `MainLoop` and `DataProcessor` in sync, closed #4421, but changed `current_internal_marker` lifetime and broke the source-propagation case in `ReconfigurationSpec` (#4545). - PR #4547 tried to restore atomic marker consumption on top of #4424 and re-enabled the source-propagation case in `ReconfigurationSpec`. CI continued to fail. - PR #4552 reverted #4424 outright as a stop-gap. State-processing is back to its pre-#4424 broken state — see #4559. </details> ### Any related issues, documentation, discussions? Fixes #4559. Follow-up to #4421 / #4424 / #4545 / #4547 / #4552. ### How was this PR tested? Existing `core/runnables/test_main_loop.py` tests pass unchanged. Added three new tests: - `test_process_state_can_emit_multiple_states` — stub-level coverage of the #4421 "second state not processed" scenario. - `test_main_loop_thread_can_process_state` — full real-thread coverage of state DataElements and `produce_state_on_finish` on `EndChannel`. Times out on plain `main` (#4559); passes on this branch. - `test_main_loop_thread_can_process_state_after_tuple` — coverage for the mixed `tuple → state` input sequence. `ReconfigurationSpec`'s source-propagation case (re-enabled in #4547) should be re-run on this branch to confirm the new handshake does not re-introduce #4545. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Anthropic Claude Opus 4.7
What changes were proposed in this PR?
This PR fixes Python state processing in
MainLoop/DataProcessorso an operator can process multiple input states in sequence.The root cause is that the state-processing path was missing context switches that keep
MainLoopandDataProcessorin sync. Without those handoffs, the first state can be processed, but the second state does not get delivered correctly.This PR adds the missing
_switch_context()calls:MainLoopstarts processing an input stateDataProcessorinvokesexecutor.process_state(...)MainLoopcontinues internal-marker handling after ECM processingThese changes keep the state-processing handshake aligned across repeated state inputs.
Any related issues, documentation, discussions?
Closes #4421
How was this PR tested?
Added a regression test in
amber/src/main/python/core/runnables/test_main_loop.py:test_process_state_can_emit_multiple_statesRan:
pytest amber/src/main/python/core/runnables/test_main_loop.py -k process_state_can_emit_multiple_statespytest amber/src/main/python/core/runnables/test_main_loop.pyThe regression test passes on this branch and fails on
main, confirming that without the context-switch changes the operator cannot process two states correctly.Was this PR authored or co-authored using generative AI tooling?
Test case generated by ChatGPT.