Skip to content

Commit 8edf042

Browse files
committed
[IMP] queue_job: remove cron garbage collector and automatically requeue jobs in timeout
[IMP] queue_job: increment 'retry' when re-queuing job that have been killed
1 parent 20b0e93 commit 8edf042

11 files changed

Lines changed: 166 additions & 118 deletions

File tree

queue_job/README.rst

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -137,18 +137,7 @@ Configuration
137137
.. [1] It works with the threaded Odoo server too, although this way
138138
of running Odoo is obviously not for production purposes.
139139
140-
* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs.
141-
142-
* ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck.
143-
Set it to 0 to disable this check.
144-
* ``started_delta``: Spent time in minutes after which a started job is considered stuck.
145-
This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration.
146-
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter.
147-
148-
.. code-block:: python
149-
150-
# `model` corresponds to 'queue.job' model
151-
model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
140+
* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued.
152141

153142
Usage
154143
=====

queue_job/__manifest__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
{
44
"name": "Job Queue",
5-
"version": "16.0.2.6.8",
5+
"version": "16.0.2.7.0",
66
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
77
"website": "https://github.com/OCA/queue",
88
"license": "LGPL-3",

queue_job/controllers/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def _try_perform_job(self, env, job):
3131
job.set_started()
3232
job.store()
3333
env.cr.commit()
34+
job.lock()
35+
3436
_logger.debug("%s started", job)
3537

3638
job.perform()

queue_job/data/queue_data.xml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,6 @@
11
<?xml version="1.0" encoding="utf-8" ?>
22
<odoo>
33
<data noupdate="1">
4-
<record id="ir_cron_queue_job_garbage_collector" model="ir.cron">
5-
<field name="name">Jobs Garbage Collector</field>
6-
<field name="interval_number">5</field>
7-
<field name="interval_type">minutes</field>
8-
<field name="numbercall">-1</field>
9-
<field ref="model_queue_job" name="model_id" />
10-
<field name="state">code</field>
11-
<field
12-
name="code"
13-
>model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)</field>
14-
</record>
154
<!-- Queue-job-related subtypes for messaging / Chatter -->
165
<record id="mt_job_failed" model="mail.message.subtype">
176
<field name="name">Job failed</field>

queue_job/job.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,34 @@ def load_many(cls, env, job_uuids):
238238
recordset = cls.db_records_from_uuids(env, job_uuids)
239239
return {cls._load_from_db_record(record) for record in recordset}
240240

241+
def lock(self):
242+
self.env.cr.execute(
243+
"""
244+
SELECT
245+
*
246+
FROM
247+
queue_job_locks
248+
WHERE
249+
id in (
250+
SELECT
251+
id
252+
FROM
253+
queue_job
254+
WHERE
255+
uuid = %s
256+
AND state='started'
257+
)
258+
FOR UPDATE;
259+
""",
260+
[self.uuid],
261+
)
262+
263+
# 1 job should be locked
264+
if not 1 == len(self.env.cr.fetchall()):
265+
raise RetryableJobError(
266+
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
267+
)
268+
241269
@classmethod
242270
def _load_from_db_record(cls, job_db_record):
243271
stored = job_db_record
@@ -517,6 +545,11 @@ def perform(self):
517545
518546
The job is executed with the user which has initiated it.
519547
"""
548+
if self.max_retries and self.retry >= self.max_retries:
549+
raise FailedJobError(
550+
"Job: %s, Max. retries (%d) reached" % (self.uuid, self.max_retries)
551+
)
552+
520553
self.retry += 1
521554
try:
522555
self.result = self.func(*tuple(self.args), **self.kwargs)
@@ -820,6 +853,23 @@ def set_started(self):
820853
self.date_started = datetime.now()
821854
self.worker_pid = os.getpid()
822855

856+
# add job to list of lockable jobs
857+
self.env.cr.execute(
858+
"""
859+
INSERT INTO
860+
queue_job_locks (id)
861+
SELECT
862+
id
863+
FROM
864+
queue_job
865+
WHERE
866+
uuid = %s
867+
ON CONFLICT(id)
868+
DO NOTHING;
869+
""",
870+
[self.uuid],
871+
)
872+
823873
def set_done(self, result=None):
824874
self.state = DONE
825875
self.exc_name = None

queue_job/jobrunner/runner.py

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -114,22 +114,6 @@
114114
* After creating a new database or installing queue_job on an
115115
existing database, Odoo must be restarted for the runner to detect it.
116116
117-
* When Odoo shuts down normally, it waits for running jobs to finish.
118-
However, when the Odoo server crashes or is otherwise force-stopped,
119-
running jobs are interrupted while the runner has no chance to know
120-
they have been aborted. In such situations, jobs may remain in
121-
``started`` or ``enqueued`` state after the Odoo server is halted.
122-
Since the runner has no way to know if they are actually running or
123-
not, and does not know for sure if it is safe to restart the jobs,
124-
it does not attempt to restart them automatically. Such stale jobs
125-
therefore fill the running queue and prevent other jobs to start.
126-
You must therefore requeue them manually, either from the Jobs view,
127-
or by running the following SQL statement *before starting Odoo*:
128-
129-
.. code-block:: sql
130-
131-
update queue_job set state='pending' where state in ('started', 'enqueued')
132-
133117
.. rubric:: Footnotes
134118
135119
.. [1] From a security standpoint, it is safe to have an anonymous HTTP
@@ -343,6 +327,62 @@ def set_job_enqueued(self, uuid):
343327
(ENQUEUED, uuid),
344328
)
345329

330+
def requeue_dead_jobs(self):
331+
"""
332+
Set started and enqueued jobs but not locked to pending
333+
334+
A job is locked when it's being executed
335+
When a job is killed, it releases the lock
336+
337+
Adding a buffer on 'date_enqueued' to check
338+
that it has been enqueued for more than 10sec.
339+
This prevents from requeuing jobs before they are actually started.
340+
341+
When Odoo shuts down normally, it waits for running jobs to finish.
342+
However, when the Odoo server crashes or is otherwise force-stopped,
343+
running jobs are interrupted while the runner has no chance to know
344+
they have been aborted.
345+
"""
346+
347+
with closing(self.conn.cursor()) as cr:
348+
query = """
349+
UPDATE
350+
queue_job
351+
SET
352+
state='pending',
353+
retry=(CASE WHEN state='started' THEN retry+1 ELSE retry END)
354+
WHERE
355+
id in (
356+
SELECT
357+
id
358+
FROM
359+
queue_job_locks
360+
WHERE
361+
id in (
362+
SELECT
363+
id
364+
FROM
365+
queue_job
366+
WHERE
367+
state IN ('enqueued','started')
368+
AND date_enqueued <
369+
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
370+
)
371+
FOR UPDATE SKIP LOCKED
372+
)
373+
RETURNING uuid
374+
"""
375+
376+
cr.execute(query)
377+
378+
job_uuids_to_requeue = [job_uuid[0] for job_uuid in cr.fetchall()]
379+
if job_uuids_to_requeue:
380+
for uuid in job_uuids_to_requeue:
381+
_logger.warning(
382+
"Re-queued job with uuid: %s",
383+
str(uuid),
384+
)
385+
346386

347387
class QueueJobRunner(object):
348388
def __init__(
@@ -424,6 +464,11 @@ def initialize_databases(self):
424464
self.channel_manager.notify(db_name, *job_data)
425465
_logger.info("queue job runner ready for db %s", db_name)
426466

467+
def requeue_dead_jobs(self):
468+
for db in self.db_by_name.values():
469+
if db.has_queue_job:
470+
db.requeue_dead_jobs()
471+
427472
def run_jobs(self):
428473
now = _odoo_now()
429474
for job in self.channel_manager.get_jobs_to_run(now):
@@ -516,6 +561,7 @@ def run(self):
516561
_logger.info("database connections ready")
517562
# inner loop does the normal processing
518563
while not self._stop:
564+
self.requeue_dead_jobs()
519565
self.process_notifications()
520566
self.run_jobs()
521567
self.wait_notification()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
2+
3+
4+
def migrate(cr, version):
5+
# Create job lock table
6+
cr.execute(
7+
"""
8+
CREATE TABLE IF NOT EXISTS queue_job_locks (
9+
id INT PRIMARY KEY,
10+
CONSTRAINT
11+
queue_job_locks_queue_job_id_fkey
12+
FOREIGN KEY (id)
13+
REFERENCES queue_job (id) ON DELETE CASCADE
14+
);
15+
"""
16+
)
17+
18+
# Deactivate cron garbage collector
19+
cr.execute(
20+
"""
21+
UPDATE
22+
ir_cron
23+
SET
24+
active=False
25+
WHERE id IN (
26+
SELECT res_id
27+
FROM
28+
ir_model_data
29+
WHERE
30+
module='queue_job'
31+
AND model='ir.cron'
32+
AND name='ir_cron_queue_job_garbage_collector'
33+
);
34+
"""
35+
)

queue_job/models/queue_job.py

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from datetime import datetime, timedelta
77

88
from odoo import _, api, exceptions, fields, models
9-
from odoo.osv import expression
109
from odoo.tools import config, html_escape
1110

1211
from odoo.addons.base_sparse_field.models.fields import Serialized
@@ -417,58 +416,6 @@ def autovacuum(self):
417416
break
418417
return True
419418

420-
def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
421-
"""Fix jobs that are in a bad states
422-
423-
:param in_queue_delta: lookup time in minutes for jobs
424-
that are in enqueued state,
425-
0 means that it is not checked
426-
427-
:param started_delta: lookup time in minutes for jobs
428-
that are in started state,
429-
0 means that it is not checked,
430-
-1 will use `--limit-time-real` config value
431-
"""
432-
if started_delta == -1:
433-
started_delta = (config["limit_time_real"] // 60) + 1
434-
return self._get_stuck_jobs_to_requeue(
435-
enqueued_delta=enqueued_delta, started_delta=started_delta
436-
).requeue()
437-
438-
def _get_stuck_jobs_domain(self, queue_dl, started_dl):
439-
domain = []
440-
now = fields.datetime.now()
441-
if queue_dl:
442-
queue_dl = now - timedelta(minutes=queue_dl)
443-
domain.append(
444-
[
445-
"&",
446-
("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)),
447-
("state", "=", "enqueued"),
448-
]
449-
)
450-
if started_dl:
451-
started_dl = now - timedelta(minutes=started_dl)
452-
domain.append(
453-
[
454-
"&",
455-
("date_started", "<=", fields.Datetime.to_string(started_dl)),
456-
("state", "=", "started"),
457-
]
458-
)
459-
if not domain:
460-
raise exceptions.ValidationError(
461-
_("If both parameters are 0, ALL jobs will be requeued!")
462-
)
463-
return expression.OR(domain)
464-
465-
def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta):
466-
job_model = self.env["queue.job"]
467-
stuck_jobs = job_model.search(
468-
self._get_stuck_jobs_domain(enqueued_delta, started_delta)
469-
)
470-
return stuck_jobs
471-
472419
def related_action_open_record(self):
473420
"""Open a form view with the record(s) of the job.
474421

queue_job/post_init_hook.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,16 @@ def post_init_hook(cr, registry):
3131
FOR EACH ROW EXECUTE PROCEDURE queue_job_notify();
3232
"""
3333
)
34+
35+
# Create job lock table
36+
cr.execute(
37+
"""
38+
CREATE TABLE IF NOT EXISTS queue_job_locks (
39+
id INT PRIMARY KEY,
40+
CONSTRAINT
41+
queue_job_locks_queue_job_id_fkey
42+
FOREIGN KEY (id)
43+
REFERENCES queue_job (id) ON DELETE CASCADE
44+
);
45+
"""
46+
)

queue_job/readme/CONFIGURE.rst

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,4 @@
4747
.. [1] It works with the threaded Odoo server too, although this way
4848
of running Odoo is obviously not for production purposes.
4949
50-
* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs.
51-
52-
* ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck.
53-
Set it to 0 to disable this check.
54-
* ``started_delta``: Spent time in minutes after which a started job is considered stuck.
55-
This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration.
56-
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter.
57-
58-
.. code-block:: python
59-
60-
# `model` corresponds to 'queue.job' model
61-
model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
50+
* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued.

0 commit comments

Comments
 (0)