Skip to content

Commit c5e5c90

Browse files
some cleanup
1 parent f2a56ac commit c5e5c90

5 files changed

Lines changed: 24 additions & 24 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#### Bug fixes
44

55
- Added an app-update database migration to repair stale open assignment-history rows in existing leader databases. This was caused by the new "re-assignment" dialog box in the Pioreactor's page. The biggest impact was duplicate logs showing up on the Logs UI page.
6+
- Fixed repeated in-process actions such as dosing/pump runs to clean up MQTT listeners, signal handlers, and short-lived MQTT clients more reliably, preventing runaway localhost socket/file-descriptor growth in long-lived jobs.
67

78
### 26.3.3
89

core/pioreactor/pubsub.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def shutdown(self) -> None:
5454
self._reset_sockets(sockpair_only=True)
5555

5656
def loop_stop(self) -> MQTTErrorCode:
57+
# fast exits
5758
thread = self._thread
5859
if thread is None:
5960
return MQTTErrorCode.MQTT_ERR_INVAL
@@ -285,6 +286,18 @@ def subscribe_and_callback(
285286
"""
286287
assert callable(callback), "callback should be callable - do you need to change the order of arguments?"
287288

289+
def remove_callback_subscription(client: Client, topic: str) -> None:
290+
# This cleanup assumes effective ownership of this topic filter on the provided client.
291+
# Paho stores one callback per topic filter, and unsubscribe() removes the filter for the
292+
# whole client. On a genuinely shared long-lived client, this can clobber another caller's
293+
# callback/subscription if they reused the same topic. Today we accept that exclusivity risk
294+
# to avoid unbounded listener growth, but it is a known failure mode of this teardown path.
295+
with suppress(KeyError, ValueError):
296+
client.message_callback_remove(topic)
297+
298+
with suppress(ValueError):
299+
client.unsubscribe(topic)
300+
288301
def wrap_callback(actual_callback: Callable[[pt.MQTTMessage], Any]) -> Callable[..., Any]:
289302
def _callback(client: Client, userdata: dict[str, Any], message: pt.MQTTMessage) -> Any:
290303
try:
@@ -346,26 +359,13 @@ def on_subscribe(
346359
if on_cleanup is not None:
347360

348361
def cleanup_subscription(topic: str = topic) -> None:
349-
_remove_callback_subscription(client, topic)
362+
remove_callback_subscription(client, topic)
350363

351364
on_cleanup.append(cleanup_subscription)
352365

353366
return client
354367

355368

356-
def _remove_callback_subscription(client: Client, topic: str) -> None:
357-
# This cleanup assumes effective ownership of this topic filter on the provided client.
358-
# Paho stores one callback per topic filter, and unsubscribe() removes the filter for the
359-
# whole client. On a genuinely shared long-lived client, this can clobber another caller's
360-
# callback/subscription if they reused the same topic. Today we accept that exclusivity risk
361-
# to avoid unbounded listener growth, but it is a known failure mode of this teardown path.
362-
with suppress(KeyError, ValueError):
363-
client.message_callback_remove(topic)
364-
365-
with suppress(ValueError):
366-
client.unsubscribe(topic)
367-
368-
369369
def prune_retained_messages(topics_to_prune: str = "#") -> None:
370370
topics = []
371371

core/pioreactor/utils/__init__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def __init__(
210210
self.unit = unit
211211
self.experiment = experiment
212212
self.name = name
213-
self.state = st("init")
213+
self.state = st.INIT
214214
self.exit_event = Event()
215215
self._source = source
216216
self.is_long_running_job = is_long_running_job
@@ -269,7 +269,7 @@ def __init__(
269269
)
270270
assert self.mqtt_client is not None
271271

272-
self.state = st("init")
272+
self.state = st.INIT
273273
self.publish_setting("$state", self.state)
274274

275275
# Teardown for signal handlers and passive MQTT listeners is centralized in __exit__.
@@ -290,13 +290,13 @@ def _on_disconnect(self, *args: object) -> None:
290290
self._exit()
291291

292292
def __enter__(self) -> Self:
293-
self.state = st("ready")
293+
self.state = st.READY
294294
self.publish_setting("$state", self.state)
295295

296296
return self
297297

298298
def __exit__(self, *args: object) -> None:
299-
self.state = st("disconnected")
299+
self.state = st.DISCONNECTED
300300
self._exit()
301301
try:
302302
self.publish_setting("$state", self.state)
@@ -335,8 +335,8 @@ def start_passive_listeners(self) -> None:
335335

336336
def _remove_passive_listeners(self) -> None:
337337
while self._mqtt_cleanup_callables:
338-
cleanup = self._mqtt_cleanup_callables.pop()
339-
cleanup()
338+
_cleanup = self._mqtt_cleanup_callables.pop()
339+
_cleanup()
340340

341341
def _remove_signal_handlers(self) -> None:
342342
if not self._registered_signal_handlers:

core/tests/test_automation_yamls.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from functools import cache
44
from typing import Any
55

6+
import pytest
67
from pioreactor.automations import * # noqa: F403, F401
78
from pioreactor.background_jobs.dosing_automation import available_dosing_automations
89
from pioreactor.background_jobs.dosing_automation import DosingAutomationJobContrib
@@ -46,6 +47,7 @@ def get_automation_yaml_filename(type_: str, automation_name: str) -> str:
4647
raise FileNotFoundError(f"Unable to locate YAML for automation '{automation_name}' in '{type_}'.")
4748

4849

50+
@pytest.mark.slow
4951
def test_automations_and_their_yamls_have_the_same_data() -> None:
5052
try:
5153
for type_, available_automations in [

core/tests/test_dosing_automation.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,10 +1032,7 @@ def publish(self, topic: str, payload=None, qos: int = 0, **kwargs):
10321032
stop_messages.append((topic, payload, qos))
10331033
return None
10341034

1035-
def loop_stop(self) -> None:
1036-
return None
1037-
1038-
def disconnect(self) -> None:
1035+
def shutdown(self) -> None:
10391036
return None
10401037

10411038
class StubAutomation(DosingAutomationJob):

0 commit comments

Comments
 (0)