Skip to content

Commit 3e5f22d

Browse files
committed
[ADD] Automate requeue of inactive jobs
* Job Queue Lock model added to track jobs being actively processed by Odoo * Any job not being actively worked on will either be: * Requeued, if max_retries not reached * Marked as failed, if max_retries reached * Covers cases where the Odoo instance is restarted or a job is killed by Odoo due to exceeding the limit_time_cpu setting Backport of enhancement from Odoo 16 PR on OCA: OCA#716
1 parent 7972344 commit 3e5f22d

6 files changed

Lines changed: 163 additions & 18 deletions

File tree

queue_job/controllers/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def _try_perform_job(self, env, job):
5656
job.set_started()
5757
job.store()
5858
http.request.env.cr.commit()
59-
59+
job.lock()
6060
_logger.debug('%s started', job)
6161
job.perform()
6262
job.set_done()

queue_job/job.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,60 @@ def load(cls, env, job_uuid):
252252
'Job %s does no longer exist in the storage.' % job_uuid)
253253
return cls._load_from_db_record(stored)
254254

255+
def add_lock_record(self):
256+
"""
257+
Create row in db to be locked while the job is being performed.
258+
"""
259+
self.env.cr.execute(
260+
"""
261+
INSERT INTO
262+
queue_job_lock (id, queue_job_id)
263+
SELECT
264+
id, id
265+
FROM
266+
queue_job
267+
WHERE
268+
uuid = %s
269+
ON CONFLICT(id)
270+
DO NOTHING
271+
""",
272+
[self.uuid],
273+
)
274+
275+
def lock(self):
276+
"""
277+
Lock row of job that is being performed
278+
If a job cannot be locked,
279+
it means that the job wasn't started,
280+
a RetryableJobError is thrown.
281+
"""
282+
self.env.cr.execute(
283+
"""
284+
SELECT
285+
*
286+
FROM
287+
queue_job_lock
288+
WHERE
289+
queue_job_id in (
290+
SELECT
291+
id
292+
FROM
293+
queue_job
294+
WHERE
295+
uuid = %s
296+
AND state='started'
297+
)
298+
FOR UPDATE
299+
""",
300+
[self.uuid],
301+
)
302+
303+
# 1 job should be locked
304+
if 1 != len(self.env.cr.fetchall()):
305+
raise RetryableJobError(
306+
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
307+
)
308+
255309
@classmethod
256310
def _load_from_db_record(cls, job_db_record):
257311
stored = job_db_record
@@ -651,6 +705,7 @@ def set_enqueued(self):
651705
def set_started(self):
652706
self.state = STARTED
653707
self.date_started = datetime.now()
708+
self.add_lock_record()
654709

655710
def set_done(self, result=None):
656711
self.state = DONE

queue_job/jobrunner/runner.py

Lines changed: 89 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@
148148
import odoo
149149
from odoo.tools import config
150150

151-
from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE
151+
from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE, FAILED
152152

153153
SELECT_TIMEOUT = 60
154154
ERROR_RECOVERY_DELAY = 5
@@ -214,19 +214,6 @@ def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):
214214
response = session.get(url, timeout=30, auth=auth)
215215
response.raise_for_status()
216216

217-
# Method to set failed job (due to timeout, etc) as pending,
218-
# to avoid keeping it as enqueued.
219-
def set_job_pending():
220-
connection_info = _connection_info_for(db_name)
221-
conn = psycopg2.connect(**connection_info)
222-
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
223-
with closing(conn.cursor()) as cr:
224-
cr.execute(
225-
"UPDATE queue_job SET state=%s, "
226-
"date_enqueued=NULL, date_started=NULL "
227-
"WHERE uuid=%s and state=%s", (PENDING, job_uuid, ENQUEUED)
228-
)
229-
230217
# TODO: better way to HTTP GET asynchronously (grequest, ...)?
231218
# if this was python3 I would be doing this with
232219
# asyncio, aiohttp and aiopg
@@ -250,11 +237,10 @@ def urlopen():
250237
# for codes between 500 and 600
251238
response.raise_for_status()
252239
except requests.Timeout:
253-
set_job_pending()
240+
pass
254241
except Exception:
255-
_logger.exception("exception in GET %s", url)
242+
# _logger.exception("exception in GET %s", url)
256243
session.cookies.clear()
257-
set_job_pending()
258244
thread = threading.Thread(target=urlopen)
259245
thread.daemon = True
260246
thread.start()
@@ -342,6 +328,86 @@ def set_job_enqueued(self, uuid):
342328
"WHERE uuid=%s",
343329
(ENQUEUED, uuid))
344330

331+
def _query_requeue_dead_jobs(self):
332+
return """
333+
UPDATE
334+
queue_job
335+
SET
336+
state=(
337+
CASE
338+
WHEN
339+
max_retries IS NOT NULL AND
340+
retry IS NOT NULL AND
341+
retry>=max_retries
342+
THEN 'failed'
343+
ELSE 'pending'
344+
END),
345+
retry=(CASE WHEN state='started' THEN COALESCE(retry,0)+1 ELSE retry END),
346+
exc_info=(
347+
CASE
348+
WHEN
349+
max_retries IS NOT NULL AND
350+
retry IS NOT NULL AND
351+
retry>=max_retries
352+
THEN 'Job not completed, max retries reached'
353+
ELSE exc_info
354+
END)
355+
WHERE
356+
id in (
357+
SELECT
358+
queue_job_id
359+
FROM
360+
queue_job_lock
361+
WHERE
362+
queue_job_id in (
363+
SELECT
364+
id
365+
FROM
366+
queue_job
367+
WHERE
368+
state IN ('enqueued','started')
369+
AND date_enqueued <
370+
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
371+
)
372+
FOR UPDATE SKIP LOCKED
373+
)
374+
RETURNING uuid, state, name, method_name
375+
"""
376+
377+
def requeue_dead_jobs(self):
378+
"""
379+
Set started and enqueued jobs but not locked to pending
380+
A job is locked when it's being executed
381+
When a job is killed, it releases the lock
382+
If the number of retries exceeds the number of max retries,
383+
the job is set as 'failed' with the error 'JobFoundDead'.
384+
Adding a buffer on 'date_enqueued' to check
385+
that it has been enqueued for more than 10sec.
386+
This prevents from requeuing jobs before they are actually started.
387+
When Odoo shuts down normally, it waits for running jobs to finish.
388+
However, when the Odoo server crashes or is otherwise force-stopped,
389+
running jobs are interrupted while the runner has no chance to know
390+
they have been aborted.
391+
392+
Returns information about inactive jobs, those requeued and those
393+
marked as failed.
394+
"""
395+
pending_job_info = []
396+
failed_job_info = []
397+
398+
with closing(self.conn.cursor()) as cr:
399+
query = self._query_requeue_dead_jobs()
400+
cr.execute(query)
401+
for (uuid, state, name, method) in cr.fetchall():
402+
if state == PENDING:
403+
pending_job_info.append((uuid, name, method))
404+
_logger.warning("Re-queued inactive job with UUID: %s", uuid)
405+
elif state == FAILED:
406+
failed_job_info.append((uuid, name, method))
407+
_logger.warning("Inactive job marked as failed with UUID: %s", uuid)
408+
409+
return pending_job_info, failed_job_info
410+
345411

346412
class QueueJobRunner(object):
347413

@@ -417,6 +483,11 @@ def initialize_databases(self):
417483
self.channel_manager.notify(db_name, *job_data)
418484
_logger.info('queue job runner ready for db %s', db_name)
419485

486+
def requeue_dead_jobs(self):
487+
for db in self.db_by_name.values():
488+
if db.has_queue_job:
489+
db.requeue_dead_jobs()
490+
420491
def run_jobs(self):
421492
now = _odoo_now()
422493
for job in self.channel_manager.get_jobs_to_run(now):
@@ -495,6 +566,7 @@ def run(self):
495566
_logger.info("database connections ready")
496567
# inner loop does the normal processing
497568
while not self._stop:
569+
self.requeue_dead_jobs()
498570
self.process_notifications()
499571
self.run_jobs()
500572
self.wait_notification()

queue_job/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from . import base
22
from . import queue_job
3+
from . import queue_job_lock

queue_job/models/queue_job_lock.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright 2025 ACSONE SA/NV
2+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
3+
4+
from odoo import fields, models
5+
6+
7+
class QueueJobLock(models.Model):
8+
_name = "queue.job.lock"
9+
_description = "Queue Job Lock"
10+
11+
queue_job_id = fields.Many2one(
12+
comodel_name="queue.job",
13+
required=True,
14+
ondelete="cascade",
15+
index=True,
16+
)

queue_job/security/ir.model.access.csv

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
22
access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1
33
access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1
44
access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1
5+
access_queue_job_lock_manager,queue job lock manager,queue_job.model_queue_job_lock,queue_job.group_queue_job_manager,1,0,0,0

0 commit comments

Comments
 (0)