From f5dab86385b8888e8125063fba142ef9fbabcd34 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Mon, 20 Apr 2026 02:36:50 -0700 Subject: [PATCH 1/5] init --- amber/src/main/python/core/runnables/data_processor.py | 1 + amber/src/main/python/core/runnables/main_loop.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 4399b1a3a2f..815e85a6446 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None: self._context.worker_id, self._context.console_message_manager.print_buf, ): + self._switch_context() self._set_output_state(executor.process_state(state, port_id)) except Exception as err: diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index d73c655734f..2f2391cf7b4 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -241,6 +241,7 @@ def _process_tuple(self, tuple_: Tuple) -> None: def _process_state(self, state_: State) -> None: self.context.state_processing_manager.current_input_state = state_ + self._switch_context() self.process_input_state() self._check_and_process_control() @@ -329,7 +330,7 @@ def _process_ecm(self, ecm_element: ECMElement): if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT: self.context.pause_manager.resume(PauseType.ECM_PAUSE) - + self._switch_context() if self.context.tuple_processing_manager.current_internal_marker: { StartChannel: self._process_start_channel, From 42cc3557672686ba250f86b8054888be51faf34d Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Mon, 20 Apr 2026 12:52:42 -0700 Subject: [PATCH 2/5] fix fmt --- .../python/core/runnables/test_main_loop.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/amber/src/main/python/core/runnables/test_main_loop.py b/amber/src/main/python/core/runnables/test_main_loop.py index 5ad0afec9bc..d903d6e58da 100644 --- a/amber/src/main/python/core/runnables/test_main_loop.py +++ b/amber/src/main/python/core/runnables/test_main_loop.py @@ -25,6 +25,8 @@ from core.models import ( DataFrame, InternalQueue, + State, + StateFrame, Tuple, ) from core.models.internal_queue import ( @@ -1035,6 +1037,65 @@ def test_main_loop_thread_can_process_single_tuple_with_binary( reraise() + @pytest.mark.timeout(2) + def test_process_state_can_emit_multiple_states( + self, + main_loop, + output_queue, + mock_data_output_channel, + monkeypatch, + ): + class DummyExecutor: + @staticmethod + def process_state(state: State, port: int) -> State: + output_state = State() + output_state["value"] = state["value"] + 1 + output_state["port"] = port + return output_state + + main_loop.context.executor_manager.executor = DummyExecutor() + monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: None) + monkeypatch.setattr( + main_loop.context.output_manager, + "emit_state", + lambda state: [(mock_data_output_channel.to_worker_id, StateFrame(state))], + ) + + switch_count = {"value": 0} + + def fake_switch_context(): + switch_count["value"] += 1 + if switch_count["value"] % 3 == 2: + current_input_state = ( + main_loop.context.state_processing_manager.current_input_state + ) + main_loop.context.state_processing_manager.current_output_state = ( + DummyExecutor.process_state(current_input_state, 0) + ) + + monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context) + + first_state = State() + first_state["value"] = 1 + second_state = State() + second_state["value"] = 41 + + main_loop._process_state(first_state) + main_loop._process_state(second_state) + + first_output: DataElement = output_queue.get() + second_output: DataElement = output_queue.get() + + assert first_output.tag == mock_data_output_channel + assert isinstance(first_output.payload, StateFrame) + assert first_output.payload.frame["value"] == 2 + assert first_output.payload.frame["port"] == 0 + + assert second_output.tag == mock_data_output_channel + assert isinstance(second_output.payload, StateFrame) + assert second_output.payload.frame["value"] == 42 + assert second_output.payload.frame["port"] == 0 + @staticmethod def send_pause( command_sequence, From 624203d05dc97a9f01853fe2fffadd81c883a683 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Fri, 24 Apr 2026 16:44:52 -0700 Subject: [PATCH 3/5] fix: avoid consuming internal markers during ECM handling --- amber/src/main/python/core/runnables/main_loop.py | 1 - 1 file changed, 1 deletion(-) diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index 2f2391cf7b4..1e0867fc0e0 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -330,7 +330,6 @@ def _process_ecm(self, ecm_element: ECMElement): if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT: self.context.pause_manager.resume(PauseType.ECM_PAUSE) - self._switch_context() if self.context.tuple_processing_manager.current_internal_marker: { StartChannel: self._process_start_channel, From f221293473922065257b66df2b35aca4e69d49d7 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Fri, 24 Apr 2026 17:10:15 -0700 Subject: [PATCH 4/5] fix fmt --- amber/src/main/python/core/runnables/main_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index 1e0867fc0e0..48c18407210 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -330,6 +330,7 @@ def _process_ecm(self, ecm_element: ECMElement): if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT: self.context.pause_manager.resume(PauseType.ECM_PAUSE) + if self.context.tuple_processing_manager.current_internal_marker: { StartChannel: self._process_start_channel, From b40da177987ed31864937a6408be25b5323e147d Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Fri, 24 Apr 2026 17:33:20 -0700 Subject: [PATCH 5/5] fix fmt --- .../core/architecture/managers/tuple_processing_manager.py | 3 +-- amber/src/main/python/core/runnables/main_loop.py | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py index a67949e6717..3d4c9eee26a 100644 --- a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py +++ b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py @@ -33,8 +33,7 @@ def __init__(self): self.finished_current: Event = Event() def get_internal_marker(self) -> Optional[InternalMarker]: - ret, self.current_internal_marker = self.current_internal_marker, None - return ret + return self.current_internal_marker def get_input_tuple(self) -> Optional[Tuple]: ret, self.current_input_tuple = self.current_input_tuple, None diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index 48c18407210..c1ec2bb472f 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -336,6 +336,7 @@ def _process_ecm(self, ecm_element: ECMElement): StartChannel: self._process_start_channel, EndChannel: self._process_end_channel, }[type(self.context.tuple_processing_manager.current_internal_marker)]() + self.context.tuple_processing_manager.current_internal_marker = None def _send_ecm_to_data_channels( self, method_name: str, alignment: EmbeddedControlMessageType