Skip to content

Commit 584aabd

Browse files
authored
Merge pull request #3304 from bjester/task-interruption-persistence-hotfixes
[hotfixes] Configure task behavior to acknowledge after execution to trigger requeuing when interrupted
2 parents 61b7852 + f382b19 commit 584aabd

3 files changed

Lines changed: 32 additions & 1 deletion

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ dummyusers:
1111
cd contentcuration/ && python manage.py loaddata contentcuration/fixtures/admin_user_token.json
1212

1313
prodceleryworkers:
14-
cd contentcuration/ && celery -A contentcuration worker -l info --concurrency=3 --task-events --without-mingle --without-gossip
14+
cd contentcuration/ && celery -A contentcuration worker -l info --concurrency=3 --task-events
1515

1616
prodcelerydashboard:
1717
# connect to the celery dashboard by visiting http://localhost:5555

contentcuration/contentcuration/utils/celery/app.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import base64
2+
import json
3+
14
from celery import Celery
25

36
from contentcuration.utils.celery.tasks import CeleryTask
@@ -18,3 +21,26 @@ def on_init(self):
1821
@property
1922
def AsyncResult(self):
2023
return self._result_cls
24+
25+
def get_queued_tasks(self, queue_name="celery"):
26+
"""
27+
Returns the list of tasks in the queue.
28+
29+
Use `app.control.inspect()` to get information about tasks no longer in the queue
30+
31+
:param queue_name: The queue name, defaults to the default "celery" queue
32+
:return: dict[]
33+
"""
34+
decoded_tasks = []
35+
with self.pool.acquire(block=True) as conn:
36+
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
37+
38+
for task in tasks:
39+
try:
40+
j = json.loads(task)
41+
body = json.loads(base64.b64decode(j['body']))
42+
decoded_tasks.append(body)
43+
except (TypeError, json.JSONDecodeError, AttributeError):
44+
pass
45+
46+
return decoded_tasks

contentcuration/contentcuration/utils/celery/tasks.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ def my_task(self):
7676
# custom task option
7777
track_progress = False
7878

79+
# ensure our tasks are restarted if they're interrupted
80+
acks_late = True
81+
acks_on_failure_or_timeout = True
82+
reject_on_worker_lost = True
83+
7984
_progress_tracker = None
8085

8186
def after_return(self, status, retval, task_id, args, kwargs, einfo):

0 commit comments

Comments
 (0)