What happened?
Python operators silently drop state outputs. Two scenarios:
- With multiple state inputs in a row, each state's output is observed only one cycle late — the last state's output is never emitted unless another state arrives after it.
- On
EndChannel, the state returned by Operator.produce_state_on_finish(...) is always dropped, so any operator that wants to emit a final state when its input port closes (state-aggregation, summary, terminal-reduce) cannot.
MainLoop.process_input_state reads current_output_state between two _switch_context() calls, but the executor only writes to that field during the second switch — so MainLoop always sees the previous cycle's value (or None for the first state). On EndChannel, produce_state_on_finish writes after MainLoop has returned from process_input_state, and nothing further reads the slot.
This is invisible with the default Operator.produce_state_on_finish (returns None) and with EchoOperator for the same reason — which is why test_main_loop_thread_can_process_messages does not catch it.
History
Follow-up to the lifecycle work tracked in #4421 / #4424 / #4545 / #4547 / #4552:
How to reproduce?
Wire a custom executor whose produce_state_on_finish returns a non-None state, send a state followed by EndChannel, and watch the output queue:
class FinishStateOperator:
@staticmethod
def process_tuple(tuple_, port): yield tuple_
@staticmethod
def process_state(state, port): return state
@staticmethod
def produce_state_on_finish(port):
s = State()
s.add("finish_marker", "ran")
return s
@staticmethod
def on_finish(port): yield
@staticmethod
def close(): pass
Drain the control-channel replies (port_completed × 2 + worker_execution_completed) and pull from the data sub-queue. Expected: a StateFrame with finish_marker == "ran". Actual: only the (lagged) state DataElements, then nothing — output_queue.get() blocks.
A reproducing pytest case is test_main_loop_thread_can_process_state on aglinxinyuan/texera#state-handshake-redesign: on plain main it times out; on the branch with the handshake fix applied it passes.
Version
1.1.0-incubating (Pre-release/Master)
Commit Hash (Optional)
92affb5
What happened?
Python operators silently drop state outputs. Two scenarios:
EndChannel, the state returned byOperator.produce_state_on_finish(...)is always dropped, so any operator that wants to emit a final state when its input port closes (state-aggregation, summary, terminal-reduce) cannot.MainLoop.process_input_statereadscurrent_output_statebetween two_switch_context()calls, but the executor only writes to that field during the second switch — soMainLoopalways sees the previous cycle's value (orNonefor the first state). OnEndChannel,produce_state_on_finishwrites afterMainLoophas returned fromprocess_input_state, and nothing further reads the slot.This is invisible with the default
Operator.produce_state_on_finish(returnsNone) and withEchoOperatorfor the same reason — which is whytest_main_loop_thread_can_process_messagesdoes not catch it.History
Follow-up to the lifecycle work tracked in #4421 / #4424 / #4545 / #4547 / #4552:
_switch_context()calls to keepMainLoopandDataProcessorin sync. It closed Python operator unable to process more than one State #4421 but changedcurrent_internal_markerlifetime and broke the source-propagation case inReconfigurationSpec(ReconfigurationSpec: source-propagation case hangs after UDF processes EndChannel #4545).How to reproduce?
Wire a custom executor whose
produce_state_on_finishreturns a non-Nonestate, send a state followed byEndChannel, and watch the output queue:Drain the control-channel replies (
port_completed × 2+worker_execution_completed) and pull from the data sub-queue. Expected: aStateFramewithfinish_marker == "ran". Actual: only the (lagged) state DataElements, then nothing —output_queue.get()blocks.A reproducing pytest case is
test_main_loop_thread_can_process_stateon aglinxinyuan/texera#state-handshake-redesign: on plainmainit times out; on the branch with the handshake fix applied it passes.Version
1.1.0-incubating (Pre-release/Master)
Commit Hash (Optional)
92affb5