Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import fs
import importlib
import inspect
import itertools
import sys
from cached_property import cached_property
from fs.base import FS
Expand All @@ -29,10 +30,22 @@


class ExecutorManager:
# Process-wide monotonically increasing counter used to generate the
# tmp module names ExecutorManager hands to importlib. Making this a
# class-level counter (rather than a per-instance counter that always
# restarts at 1) guarantees that no two ExecutorManager instances in
# the same Python process can collide on `udf-v1`. Without that
# guarantee, the second instance hits the "module already loaded"
# branch of importlib and the post-clear+reload path can return a
# stale class on Python 3.11 (see #4705).
#
# Single-process counters are atomic in CPython under the GIL; we
# don't expect cross-thread contention on this anyway.
_module_name_counter = itertools.count(1)

def __init__(self):
self.executor: Optional[Operator] = None
self.operator_module_name: Optional[str] = None
self.executor_version: int = 0 # incremental only

@cached_property
def fs(self) -> FS:
Expand Down Expand Up @@ -60,11 +73,13 @@ def fs(self) -> FS:

def gen_module_file_name(self) -> Tuple[str, str]:
"""
Generate a UUID to be used as udf source code file.
Generate a unique module name and corresponding tmp file name.
Names come from a process-wide monotonic counter so they never
collide with any module already in `sys.modules`, even when
multiple ExecutorManager instances live in the same process.
:return Tuple[str, str]: the pair of module_name and file_name.
"""
self.executor_version += 1
module_name = f"udf-v{self.executor_version}"
module_name = f"udf-v{next(ExecutorManager._module_name_counter)}"
file_name = f"{module_name}.py"
return module_name, file_name

Expand All @@ -84,13 +99,10 @@ def load_executor_definition(self, code: str) -> type(Operator):
f"{Path(self.fs.getsyspath('/')).joinpath(file_name)}."
)

if module_name in sys.modules:
executor_module = importlib.import_module(module_name)
executor_module.__dict__.clear()
executor_module.__dict__["__name__"] = module_name
executor_module = importlib.reload(executor_module)
else:
executor_module = importlib.import_module(module_name)
# gen_module_file_name guarantees module_name is unique across
# the process, so import_module will always cleanly load source
# from the tmp fs we just wrote — no re-import / reload dance.
executor_module = importlib.import_module(module_name)
self.operator_module_name = module_name

executors = list(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def test_initialization(self, executor_manager):
"""Test that ExecutorManager initializes correctly."""
assert executor_manager.executor is None
assert executor_manager.operator_module_name is None
assert executor_manager.executor_version == 0

def test_reject_r_tuple_language(self, executor_manager):
"""Test that 'r-tuple' language is rejected with ImportError when plugin is not available."""
Expand Down Expand Up @@ -163,8 +162,11 @@ def test_accept_python_language_regular_operator(self, executor_manager):

# Verify executor was initialized
assert executor_manager.executor is not None
assert executor_manager.operator_module_name == "udf-v1"
assert executor_manager.executor_version == 1
# Module name comes from a process-wide counter, so it has the
# right shape but its exact value depends on what other tests
# have run in the same pytest session.
assert executor_manager.operator_module_name is not None
assert executor_manager.operator_module_name.startswith("udf-v")
assert executor_manager.executor.is_source is False

def test_accept_python_language_source_operator(self, executor_manager):
Expand All @@ -176,8 +178,8 @@ def test_accept_python_language_source_operator(self, executor_manager):

# Verify executor was initialized
assert executor_manager.executor is not None
assert executor_manager.operator_module_name == "udf-v1"
assert executor_manager.executor_version == 1
assert executor_manager.operator_module_name is not None
assert executor_manager.operator_module_name.startswith("udf-v")
assert executor_manager.executor.is_source is True

def test_reject_other_unsupported_languages(self, executor_manager):
Expand All @@ -200,18 +202,26 @@ def test_reject_other_unsupported_languages(self, executor_manager):
pass

def test_gen_module_file_name_increments(self, executor_manager):
"""Test that module file names increment correctly."""
module1, file1 = executor_manager.gen_module_file_name()
assert module1 == "udf-v1"
assert file1 == "udf-v1.py"
"""Test that module file names increment monotonically.

The counter is process-wide so the absolute starting value
depends on prior tests in the same pytest session; only the
relative ordering matters for correctness.
"""
module1, file1 = executor_manager.gen_module_file_name()
module2, file2 = executor_manager.gen_module_file_name()
assert module2 == "udf-v2"
assert file2 == "udf-v2.py"

module3, file3 = executor_manager.gen_module_file_name()
assert module3 == "udf-v3"
assert file3 == "udf-v3.py"

def version(module_name: str) -> int:
return int(module_name.removeprefix("udf-v"))

v1 = version(module1)
assert version(module2) == v1 + 1
assert version(module3) == v1 + 2

assert file1 == f"{module1}.py"
assert file2 == f"{module2}.py"
assert file3 == f"{module3}.py"

def test_is_concrete_operator_static_method(self):
"""Test the is_concrete_operator static method."""
Expand Down Expand Up @@ -246,3 +256,136 @@ def test_source_operator_mismatch_raises_error(self, executor_manager):
language="python",
)
assert "SourceOperator API" in str(exc_info.value)


REPLACEMENT_OPERATOR_CODE = """
from pytexera import *

class ReplacementOperator(UDFOperatorV2):
def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_
"""

NO_OPERATOR_CODE = """
def helper():
return 42
"""

TWO_OPERATORS_CODE = """
from pytexera import *

class FirstOperator(UDFOperatorV2):
def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_

class SecondOperator(UDFOperatorV2):
def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_
"""


class TestUpdateExecutor:
"""Test suite for ExecutorManager.update_executor.

Notes on test isolation: the existing TestExecutorManager fixture cannot
fully clean up the udf-vN modules it imports (its `hasattr(manager, "_fs")`
cleanup guard is buggy — the actual cached_property key is `fs`), so a
given udf-v1 module may already live in sys.modules with a path attached
to a previous test's tmp filesystem. These tests therefore avoid asserting
on attributes baked into a specific operator class and instead use
setattr/getattr-only semantics that hold regardless of which cached
module satisfies the import.
"""

@pytest.fixture
def initialized_manager(self):
manager = ExecutorManager()
manager.initialize_executor(
code=SAMPLE_OPERATOR_CODE, is_source=False, language="python"
)
# Stamp custom attributes on the live instance so the dict-preservation
# check works even if the underlying class came from a cached module.
manager.executor.runtime_field = "set-after-init"
manager.executor.counter = 6
yield manager
manager.close()

def test_update_preserves_pre_update_dict_state(self, initialized_manager):
before = initialized_manager.executor
before_dict = dict(before.__dict__)

initialized_manager.update_executor(
code=REPLACEMENT_OPERATOR_CODE, is_source=False
)

# update_executor reuses the prior __dict__ on a freshly instantiated
# operator — verify both halves: a NEW instance, but the OLD state.
assert initialized_manager.executor is not before
assert initialized_manager.executor.runtime_field == "set-after-init"
assert initialized_manager.executor.counter == 6
# Assert key presence explicitly so a missing key with an expected
# value of None doesn't slip past via dict.get()'s default.
after_dict = initialized_manager.executor.__dict__
for key, value in before_dict.items():
assert key in after_dict, f"key {key!r} missing after update"
assert after_dict[key] == value

def test_update_advances_module_name_monotonically(self, initialized_manager):
# The module-name counter is process-wide, so absolute values
# depend on prior tests in the same pytest session; only the
# relative bump matters.
before = initialized_manager.operator_module_name
assert before is not None and before.startswith("udf-v")

initialized_manager.update_executor(
code=REPLACEMENT_OPERATOR_CODE, is_source=False
)

after = initialized_manager.operator_module_name
assert after is not None and after.startswith("udf-v")
assert int(after.removeprefix("udf-v")) == int(before.removeprefix("udf-v")) + 1

def test_update_with_source_mismatch_raises_assertion(self, initialized_manager):
# The replacement code is a regular operator, but is_source=True asks
# the manager to treat it as a source operator. Same guardrail as
# initialize_executor.
with pytest.raises(AssertionError) as exc_info:
initialized_manager.update_executor(
code=REPLACEMENT_OPERATOR_CODE, is_source=True
)
assert "SourceOperator API" in str(exc_info.value)

def test_update_with_no_operator_class_raises_assertion(self, initialized_manager):
# load_executor_definition asserts exactly one Operator subclass exists
# in the module — an empty module trips that assertion.
with pytest.raises(AssertionError) as exc_info:
initialized_manager.update_executor(code=NO_OPERATOR_CODE, is_source=False)
assert "one and only one Operator" in str(exc_info.value)

def test_update_with_multiple_operator_classes_raises_assertion(
self, initialized_manager
):
with pytest.raises(AssertionError) as exc_info:
initialized_manager.update_executor(
code=TWO_OPERATORS_CODE, is_source=False
)
assert "one and only one Operator" in str(exc_info.value)

def test_repeated_updates_keep_carrying_the_running_state(
self, initialized_manager
):
# Update once, mutate the new instance, then update again — the second
# update must see the *latest* state, not the snapshot from before
# the first update.
initialized_manager.update_executor(
code=REPLACEMENT_OPERATOR_CODE, is_source=False
)
initialized_manager.executor.counter = 42
initialized_manager.executor.added_after_update = True

initialized_manager.update_executor(
code=REPLACEMENT_OPERATOR_CODE, is_source=False
)

assert initialized_manager.executor.counter == 42
assert initialized_manager.executor.added_after_update is True
Loading