Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ def __init__(self):
self.finished_current: Event = Event()

def get_internal_marker(self) -> Optional[InternalMarker]:
ret, self.current_internal_marker = self.current_internal_marker, None
return ret
Comment on lines -36 to -37
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we change this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To address the comment. You can check the comment(there is only one).

return self.current_internal_marker

def get_input_tuple(self) -> Optional[Tuple]:
ret, self.current_input_tuple = self.current_input_tuple, None
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None:
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
Comment thread
aglinxinyuan marked this conversation as resolved.
self._switch_context()
self._set_output_state(executor.process_state(state, port_id))

except Exception as err:
Expand Down
2 changes: 2 additions & 0 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def _process_tuple(self, tuple_: Tuple) -> None:

def _process_state(self, state_: State) -> None:
self.context.state_processing_manager.current_input_state = state_
self._switch_context()
self.process_input_state()
Comment thread
aglinxinyuan marked this conversation as resolved.
self._check_and_process_control()

Expand Down Expand Up @@ -341,6 +342,7 @@ def _process_ecm(self, ecm_element: ECMElement):
StartChannel: self._process_start_channel,
EndChannel: self._process_end_channel,
}[type(self.context.tuple_processing_manager.current_internal_marker)]()
self.context.tuple_processing_manager.current_internal_marker = None

def _send_ecm_to_data_channels(
self, method_name: str, alignment: EmbeddedControlMessageType
Expand Down
61 changes: 61 additions & 0 deletions amber/src/main/python/core/runnables/test_main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from core.models import (
DataFrame,
InternalQueue,
State,
StateFrame,
Tuple,
)
from core.models.internal_queue import (
Expand Down Expand Up @@ -1036,6 +1038,65 @@ def test_main_loop_thread_can_process_single_tuple_with_binary(

reraise()

@pytest.mark.timeout(2)
def test_process_state_can_emit_multiple_states(
self,
main_loop,
output_queue,
mock_data_output_channel,
monkeypatch,
):
class DummyExecutor:
@staticmethod
def process_state(state: State, port: int) -> State:
output_state = State()
output_state["value"] = state["value"] + 1
output_state["port"] = port
return output_state

main_loop.context.executor_manager.executor = DummyExecutor()
monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: None)
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
lambda state: [(mock_data_output_channel.to_worker_id, StateFrame(state))],
)

switch_count = {"value": 0}

def fake_switch_context():
switch_count["value"] += 1
if switch_count["value"] % 3 == 2:
current_input_state = (
main_loop.context.state_processing_manager.current_input_state
)
main_loop.context.state_processing_manager.current_output_state = (
DummyExecutor.process_state(current_input_state, 0)
)
Comment thread
aglinxinyuan marked this conversation as resolved.

monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)

first_state = State()
first_state["value"] = 1
second_state = State()
second_state["value"] = 41

main_loop._process_state(first_state)
main_loop._process_state(second_state)

first_output: DataElement = output_queue.get()
second_output: DataElement = output_queue.get()

assert first_output.tag == mock_data_output_channel
assert isinstance(first_output.payload, StateFrame)
assert first_output.payload.frame["value"] == 2
assert first_output.payload.frame["port"] == 0

assert second_output.tag == mock_data_output_channel
assert isinstance(second_output.payload, StateFrame)
assert second_output.payload.frame["value"] == 42
assert second_output.payload.frame["port"] == 0

@staticmethod
def send_pause(
command_sequence,
Expand Down
Loading