diff --git a/amber/src/main/python/core/architecture/managers/executor_manager.py b/amber/src/main/python/core/architecture/managers/executor_manager.py index eb1363d0a68..538d7b0bf12 100644 --- a/amber/src/main/python/core/architecture/managers/executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/executor_manager.py @@ -18,6 +18,7 @@ import fs import importlib import inspect +import itertools import sys from cached_property import cached_property from fs.base import FS @@ -29,10 +30,22 @@ class ExecutorManager: + # Process-wide monotonically increasing counter used to generate the + # tmp module names ExecutorManager hands to importlib. Making this a + # class-level counter (rather than a per-instance counter that always + # restarts at 1) guarantees that no two ExecutorManager instances in + # the same Python process can collide on `udf-v1`. Without that + # guarantee, the second instance hits the "module already loaded" + # branch of importlib and the post-clear+reload path can return a + # stale class on Python 3.11 (see #4705). + # + # Single-process counters are atomic in CPython under the GIL; we + # don't expect cross-thread contention on this anyway. + _module_name_counter = itertools.count(1) + def __init__(self): self.executor: Optional[Operator] = None self.operator_module_name: Optional[str] = None - self.executor_version: int = 0 # incremental only @cached_property def fs(self) -> FS: @@ -60,11 +73,13 @@ def fs(self) -> FS: def gen_module_file_name(self) -> Tuple[str, str]: """ - Generate a UUID to be used as udf source code file. + Generate a unique module name and corresponding tmp file name. + Names come from a process-wide monotonic counter so they never + collide with any module already in `sys.modules`, even when + multiple ExecutorManager instances live in the same process. :return Tuple[str, str]: the pair of module_name and file_name. """ - self.executor_version += 1 - module_name = f"udf-v{self.executor_version}" + module_name = f"udf-v{next(ExecutorManager._module_name_counter)}" file_name = f"{module_name}.py" return module_name, file_name @@ -84,13 +99,10 @@ def load_executor_definition(self, code: str) -> type(Operator): f"{Path(self.fs.getsyspath('/')).joinpath(file_name)}." ) - if module_name in sys.modules: - executor_module = importlib.import_module(module_name) - executor_module.__dict__.clear() - executor_module.__dict__["__name__"] = module_name - executor_module = importlib.reload(executor_module) - else: - executor_module = importlib.import_module(module_name) + # gen_module_file_name guarantees module_name is unique across + # the process, so import_module will always cleanly load source + # from the tmp fs we just wrote — no re-import / reload dance. + executor_module = importlib.import_module(module_name) self.operator_module_name = module_name executors = list( diff --git a/amber/src/main/python/core/architecture/managers/test_executor_manager.py b/amber/src/main/python/core/architecture/managers/test_executor_manager.py index 901f768a216..f376f644682 100644 --- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py @@ -84,7 +84,6 @@ def test_initialization(self, executor_manager): """Test that ExecutorManager initializes correctly.""" assert executor_manager.executor is None assert executor_manager.operator_module_name is None - assert executor_manager.executor_version == 0 def test_reject_r_tuple_language(self, executor_manager): """Test that 'r-tuple' language is rejected with ImportError when plugin is not available.""" @@ -163,8 +162,11 @@ def test_accept_python_language_regular_operator(self, executor_manager): # Verify executor was initialized assert executor_manager.executor is not None - assert executor_manager.operator_module_name == "udf-v1" - assert executor_manager.executor_version == 1 + # Module name comes from a process-wide counter, so it has the + # right shape but its exact value depends on what other tests + # have run in the same pytest session. + assert executor_manager.operator_module_name is not None + assert executor_manager.operator_module_name.startswith("udf-v") assert executor_manager.executor.is_source is False def test_accept_python_language_source_operator(self, executor_manager): @@ -176,8 +178,8 @@ def test_accept_python_language_source_operator(self, executor_manager): # Verify executor was initialized assert executor_manager.executor is not None - assert executor_manager.operator_module_name == "udf-v1" - assert executor_manager.executor_version == 1 + assert executor_manager.operator_module_name is not None + assert executor_manager.operator_module_name.startswith("udf-v") assert executor_manager.executor.is_source is True def test_reject_other_unsupported_languages(self, executor_manager): @@ -200,18 +202,26 @@ def test_reject_other_unsupported_languages(self, executor_manager): pass def test_gen_module_file_name_increments(self, executor_manager): - """Test that module file names increment correctly.""" - module1, file1 = executor_manager.gen_module_file_name() - assert module1 == "udf-v1" - assert file1 == "udf-v1.py" + """Test that module file names increment monotonically. + The counter is process-wide so the absolute starting value + depends on prior tests in the same pytest session; only the + relative ordering matters for correctness. + """ + module1, file1 = executor_manager.gen_module_file_name() module2, file2 = executor_manager.gen_module_file_name() - assert module2 == "udf-v2" - assert file2 == "udf-v2.py" - module3, file3 = executor_manager.gen_module_file_name() - assert module3 == "udf-v3" - assert file3 == "udf-v3.py" + + def version(module_name: str) -> int: + return int(module_name.removeprefix("udf-v")) + + v1 = version(module1) + assert version(module2) == v1 + 1 + assert version(module3) == v1 + 2 + + assert file1 == f"{module1}.py" + assert file2 == f"{module2}.py" + assert file3 == f"{module3}.py" def test_is_concrete_operator_static_method(self): """Test the is_concrete_operator static method.""" @@ -246,3 +256,136 @@ def test_source_operator_mismatch_raises_error(self, executor_manager): language="python", ) assert "SourceOperator API" in str(exc_info.value) + + +REPLACEMENT_OPERATOR_CODE = """ +from pytexera import * + +class ReplacementOperator(UDFOperatorV2): + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ +""" + +NO_OPERATOR_CODE = """ +def helper(): + return 42 +""" + +TWO_OPERATORS_CODE = """ +from pytexera import * + +class FirstOperator(UDFOperatorV2): + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + +class SecondOperator(UDFOperatorV2): + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ +""" + + +class TestUpdateExecutor: + """Test suite for ExecutorManager.update_executor. + + Notes on test isolation: the existing TestExecutorManager fixture cannot + fully clean up the udf-vN modules it imports (its `hasattr(manager, "_fs")` + cleanup guard is buggy — the actual cached_property key is `fs`), so a + given udf-v1 module may already live in sys.modules with a path attached + to a previous test's tmp filesystem. These tests therefore avoid asserting + on attributes baked into a specific operator class and instead use + setattr/getattr-only semantics that hold regardless of which cached + module satisfies the import. + """ + + @pytest.fixture + def initialized_manager(self): + manager = ExecutorManager() + manager.initialize_executor( + code=SAMPLE_OPERATOR_CODE, is_source=False, language="python" + ) + # Stamp custom attributes on the live instance so the dict-preservation + # check works even if the underlying class came from a cached module. + manager.executor.runtime_field = "set-after-init" + manager.executor.counter = 6 + yield manager + manager.close() + + def test_update_preserves_pre_update_dict_state(self, initialized_manager): + before = initialized_manager.executor + before_dict = dict(before.__dict__) + + initialized_manager.update_executor( + code=REPLACEMENT_OPERATOR_CODE, is_source=False + ) + + # update_executor reuses the prior __dict__ on a freshly instantiated + # operator — verify both halves: a NEW instance, but the OLD state. + assert initialized_manager.executor is not before + assert initialized_manager.executor.runtime_field == "set-after-init" + assert initialized_manager.executor.counter == 6 + # Assert key presence explicitly so a missing key with an expected + # value of None doesn't slip past via dict.get()'s default. + after_dict = initialized_manager.executor.__dict__ + for key, value in before_dict.items(): + assert key in after_dict, f"key {key!r} missing after update" + assert after_dict[key] == value + + def test_update_advances_module_name_monotonically(self, initialized_manager): + # The module-name counter is process-wide, so absolute values + # depend on prior tests in the same pytest session; only the + # relative bump matters. + before = initialized_manager.operator_module_name + assert before is not None and before.startswith("udf-v") + + initialized_manager.update_executor( + code=REPLACEMENT_OPERATOR_CODE, is_source=False + ) + + after = initialized_manager.operator_module_name + assert after is not None and after.startswith("udf-v") + assert int(after.removeprefix("udf-v")) == int(before.removeprefix("udf-v")) + 1 + + def test_update_with_source_mismatch_raises_assertion(self, initialized_manager): + # The replacement code is a regular operator, but is_source=True asks + # the manager to treat it as a source operator. Same guardrail as + # initialize_executor. + with pytest.raises(AssertionError) as exc_info: + initialized_manager.update_executor( + code=REPLACEMENT_OPERATOR_CODE, is_source=True + ) + assert "SourceOperator API" in str(exc_info.value) + + def test_update_with_no_operator_class_raises_assertion(self, initialized_manager): + # load_executor_definition asserts exactly one Operator subclass exists + # in the module — an empty module trips that assertion. + with pytest.raises(AssertionError) as exc_info: + initialized_manager.update_executor(code=NO_OPERATOR_CODE, is_source=False) + assert "one and only one Operator" in str(exc_info.value) + + def test_update_with_multiple_operator_classes_raises_assertion( + self, initialized_manager + ): + with pytest.raises(AssertionError) as exc_info: + initialized_manager.update_executor( + code=TWO_OPERATORS_CODE, is_source=False + ) + assert "one and only one Operator" in str(exc_info.value) + + def test_repeated_updates_keep_carrying_the_running_state( + self, initialized_manager + ): + # Update once, mutate the new instance, then update again — the second + # update must see the *latest* state, not the snapshot from before + # the first update. + initialized_manager.update_executor( + code=REPLACEMENT_OPERATOR_CODE, is_source=False + ) + initialized_manager.executor.counter = 42 + initialized_manager.executor.added_after_update = True + + initialized_manager.update_executor( + code=REPLACEMENT_OPERATOR_CODE, is_source=False + ) + + assert initialized_manager.executor.counter == 42 + assert initialized_manager.executor.added_after_update is True