[Feature][Python] Implement EventListener in python sdk#688
Conversation
… in python_java_utils
…er utilities # Conflicts: # api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java # runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java # runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java
- Move event conversion to a public method 'convertJsonToPythonEvent' for listener support - Optimize performance by reusing a static ObjectMapper instance - Clean up redundant ObjectMapper instantiations
- Implement PythonEventListenerWrapper in PythonBridgeManager to bridge events to Python - Add logic to initialize and register Python listeners in ActionExecutionOperator - Optimize Java-to-Python event conversion by performing it once per event notification
fa2cb94 to
680c866
Compare
|
Hi, @twosom. Thanks for supporting EventListener in python sdk. Overall, I think the current design has become somewhat overly complex. In my view, we should have a single I think some of the current complexity stems from supporting That would let us drop The cost is that listeners must live at module top level (which the PR already enforces against Additionally, I believe there is no need to support event listeners in the Additionally, I believe the current testing is insufficient. We have some unit tests, but no end-to-end tests. For this cross-language scenario, we need to add real tests that submit jobs to the remote environment in |
There was a problem hiding this comment.
Thanks for the PR!
A few thoughts after going through the diff:
1. Java and Python listeners end up mutually exclusive at the operator level. The if (pythonBridge.isInitialized()) branch in ActionExecutionOperator#open (lines 197–208) calls either initForPythonEventListeners or eventRouter.initEventListeners, never both. pythonBridge.isInitialized() is true whenever the plan contains a Python action, a Python resource, or Mem0 config — so a user who registers a Java listener FQN under any of those conditions sees their listener silently never fire. On top of that, PythonBridgeManager#initForPythonEventListeners will ArrayIndexOutOfBounds on parts[1] for a Java FQN with no : in it. The event-listeners key is being asked to mean two different things based on unrelated runtime state. What do you think about splitting it — by descriptor shape, a python: prefix on Python entries, or two separate config keys?
2. No e2e coverage for the Java→Python listener path. initForPythonEventListeners, PythonEventListenerWrapper, toPythonEventContext, and the new branch in ActionExecutionOperator#open are only exercised by Python unit tests and a LocalRunner smoke test. None of it touches a real ActionExecutionOperator before merge, and that's the whole reason this feature exists. Could you add a scenario under python/flink_agents/e2e_tests/e2e_tests_resource_cross_language that submits a job with a Python listener and asserts on_event_processed fires per event?
3. The str(MyListener) identifier protocol is doing the work of a function. EventListenerMeta.__str__ emits module:Class.on_event_processed, and _resolve_module reverses it with __main__-by-file-stem plus an attribute-search across all loaded modules. A standard Python FQN ("my_module.MyClass") plus importlib.import_module + getattr + rpartition(".") would collapse the whole protocol — the metaclass, the resolver, and all the fallbacks. The current cost (listeners must live at module top level) is already enforced by the <locals> check. Hiding the descriptor inside str() also bites anyone who later logs str(MyListener) expecting <class …>. Would a classmethod (MyListener.descriptor()) or a free listener_id(cls) helper work better?
4. PythonResourceAdapter is picking up non-resource concerns. It gains toPythonEventContext() and initPythonEventListener(), neither of which is a resource. They happen to need the same interpreter, but that's the only thing they share with the rest of this interface. Once a second non-resource feature reaches in for its own bootstrapping, the name stops describing what the type does. Could we either extract a sibling (e.g. PythonEventListenerAdapter) or just call pythonInterpreter.invoke(...) directly from PythonBridgeManager for these two?
| module = sys.modules["__main__"] | ||
| first_part = class_qualname.split(".")[0] | ||
| if not hasattr(module, first_part): | ||
| for m in list(sys.modules.values()): |
There was a problem hiding this comment.
Both the __main__ and ImportError fallbacks walk sys.modules.values() and return the first module that has class_qualname.split(".")[0] as an attribute. If two modules top-level-export a class with the same name — not an unusual user mistake — the result is non-deterministic and silent: wrong listener class instantiated and notified. What invariant are we relying on here? If it's "module top-level uniqueness across the whole interpreter," can we raise on collision instead of letting first-match win?
|
|
||
| with patch("inspect.getmodule", return_value=mock_module): | ||
| # Should fallback to "__main__" if __file__ is missing | ||
| assert str(MainListenerMock) == "__main__:MainListenerMock.on_event_processed" |
There was a problem hiding this comment.
This test claims to cover the missing-__file__ branch, but MagicMock() auto-creates __file__ on access — hasattr(module_obj, "__file__") is always True and the fallback branch never runs. The assertion happens to pass because inspect.getmodule is patched, but for the wrong reason. Switching to spec=ModuleType and explicitly deleting __file__ would make the test actually exercise the fallback path.
| Timestamp of when the event occurred. | ||
| """ | ||
|
|
||
| eventType: str |
There was a problem hiding this comment.
camelCase here clashes with the rest of flink_agents/api, which is snake_case throughout. User code reading context.eventType will stand out. Would event_type with alias="eventType" satisfy the Java JSON contract while keeping the Python attribute idiomatic?
| listenerFunction.call(listenerObject, pythonEventContext, pythonEvent); | ||
| } | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
throw new RuntimeException(e) strips the descriptor of the listener that failed. The wrapper already holds listenerEntries — including the current entry's module:qualname in the message would turn "which of N listeners threw?" from a multi-minute debug into a one-line answer.
| try: | ||
| listener.on_event_processed(event_context, event) | ||
| except Exception: | ||
| logger.exception("Error in EventListener execution") |
There was a problem hiding this comment.
This swallows and logs; EventRouter#notifyEventProcessed on the Java side lets listener exceptions propagate. Same interface, two failure semantics depending on the runner. What's the rationale for diverging here? If it's deliberate, worth documenting the asymmetry in event_listener.md so users of both runners aren't surprised.
|
|
||
| static final String FROM_JAVA_EVENT_CONTEXT = PYTHON_MODULE_PREFIX + "from_java_event_context"; | ||
|
|
||
| static final String INSTANTIATE_PYTHON_EVENT_LISTER = |
There was a problem hiding this comment.
Typo — INSTANTIATE_PYTHON_EVENT_LISTER is missing an N. Should be INSTANTIATE_PYTHON_EVENT_LISTENER.
Linked issue: #687
Purpose of change
This PR implements the
EventListenermechanism for both Java and Python runtimes, ensuring API consistency across different language SDKs. It enables developers to monitor and react to events synchronously during the agent's event lifecycle.Tests
ok
API
Documentation
doc-neededdoc-not-neededdoc-included