diff --git a/Makefile b/Makefile index 5f0035c334..b69f193a7f 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ dummyusers: cd contentcuration/ && python manage.py loaddata contentcuration/fixtures/admin_user_token.json prodceleryworkers: - cd contentcuration/ && celery -A contentcuration worker -l info --concurrency=3 --task-events --without-mingle --without-gossip + cd contentcuration/ && celery -A contentcuration worker -l info --concurrency=3 --task-events prodcelerydashboard: # connect to the celery dashboard by visiting http://localhost:5555 diff --git a/contentcuration/contentcuration/utils/celery/app.py b/contentcuration/contentcuration/utils/celery/app.py index 5653ee4f7a..6be59ca3f3 100644 --- a/contentcuration/contentcuration/utils/celery/app.py +++ b/contentcuration/contentcuration/utils/celery/app.py @@ -1,3 +1,6 @@ +import base64 +import json + from celery import Celery from contentcuration.utils.celery.tasks import CeleryTask @@ -18,3 +21,26 @@ def on_init(self): @property def AsyncResult(self): return self._result_cls + + def get_queued_tasks(self, queue_name="celery"): + """ + Returns the list of tasks in the queue. + + Use `app.control.inspect()` to get information about tasks no longer in the queue + + :param queue_name: The queue name, defaults to the default "celery" queue + :return: dict[] + """ + decoded_tasks = [] + with self.pool.acquire(block=True) as conn: + tasks = conn.default_channel.client.lrange(queue_name, 0, -1) + + for task in tasks: + try: + j = json.loads(task) + body = json.loads(base64.b64decode(j['body'])) + decoded_tasks.append(body) + except (TypeError, json.JSONDecodeError, AttributeError): + pass + + return decoded_tasks diff --git a/contentcuration/contentcuration/utils/celery/tasks.py b/contentcuration/contentcuration/utils/celery/tasks.py index f98ae4af5b..92ae0b68fe 100644 --- a/contentcuration/contentcuration/utils/celery/tasks.py +++ b/contentcuration/contentcuration/utils/celery/tasks.py @@ -76,6 +76,11 @@ def my_task(self): # custom task option track_progress = False + # ensure our tasks are restarted if they're interrupted + acks_late = True + acks_on_failure_or_timeout = True + reject_on_worker_lost = True + _progress_tracker = None def after_return(self, status, retval, task_id, args, kwargs, einfo):