diff --git a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py index a67949e6717..3d4c9eee26a 100644 --- a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py +++ b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py @@ -33,8 +33,7 @@ def __init__(self): self.finished_current: Event = Event() def get_internal_marker(self) -> Optional[InternalMarker]: - ret, self.current_internal_marker = self.current_internal_marker, None - return ret + return self.current_internal_marker def get_input_tuple(self) -> Optional[Tuple]: ret, self.current_input_tuple = self.current_input_tuple, None diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 4399b1a3a2f..815e85a6446 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None: self._context.worker_id, self._context.console_message_manager.print_buf, ): + self._switch_context() self._set_output_state(executor.process_state(state, port_id)) except Exception as err: diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index cde28472061..7b22e086069 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -247,6 +247,7 @@ def _process_tuple(self, tuple_: Tuple) -> None: def _process_state(self, state_: State) -> None: self.context.state_processing_manager.current_input_state = state_ + self._switch_context() self.process_input_state() self._check_and_process_control() @@ -341,6 +342,7 @@ def _process_ecm(self, ecm_element: ECMElement): StartChannel: self._process_start_channel, EndChannel: self._process_end_channel, }[type(self.context.tuple_processing_manager.current_internal_marker)]() + self.context.tuple_processing_manager.current_internal_marker = None def _send_ecm_to_data_channels( self, method_name: str, alignment: EmbeddedControlMessageType diff --git a/amber/src/main/python/core/runnables/test_main_loop.py b/amber/src/main/python/core/runnables/test_main_loop.py index 5612e4b41ac..e6136b04207 100644 --- a/amber/src/main/python/core/runnables/test_main_loop.py +++ b/amber/src/main/python/core/runnables/test_main_loop.py @@ -26,6 +26,8 @@ from core.models import ( DataFrame, InternalQueue, + State, + StateFrame, Tuple, ) from core.models.internal_queue import ( @@ -1036,6 +1038,65 @@ def test_main_loop_thread_can_process_single_tuple_with_binary( reraise() + @pytest.mark.timeout(2) + def test_process_state_can_emit_multiple_states( + self, + main_loop, + output_queue, + mock_data_output_channel, + monkeypatch, + ): + class DummyExecutor: + @staticmethod + def process_state(state: State, port: int) -> State: + output_state = State() + output_state["value"] = state["value"] + 1 + output_state["port"] = port + return output_state + + main_loop.context.executor_manager.executor = DummyExecutor() + monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: None) + monkeypatch.setattr( + main_loop.context.output_manager, + "emit_state", + lambda state: [(mock_data_output_channel.to_worker_id, StateFrame(state))], + ) + + switch_count = {"value": 0} + + def fake_switch_context(): + switch_count["value"] += 1 + if switch_count["value"] % 3 == 2: + current_input_state = ( + main_loop.context.state_processing_manager.current_input_state + ) + main_loop.context.state_processing_manager.current_output_state = ( + DummyExecutor.process_state(current_input_state, 0) + ) + + monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context) + + first_state = State() + first_state["value"] = 1 + second_state = State() + second_state["value"] = 41 + + main_loop._process_state(first_state) + main_loop._process_state(second_state) + + first_output: DataElement = output_queue.get() + second_output: DataElement = output_queue.get() + + assert first_output.tag == mock_data_output_channel + assert isinstance(first_output.payload, StateFrame) + assert first_output.payload.frame["value"] == 2 + assert first_output.payload.frame["port"] == 0 + + assert second_output.tag == mock_data_output_channel + assert isinstance(second_output.payload, StateFrame) + assert second_output.payload.frame["value"] == 42 + assert second_output.payload.frame["port"] == 0 + @staticmethod def send_pause( command_sequence,