Skip to content

Commit 7b8906e

Browse files
committed
Merge PR #748 into 17.0
Signed-off-by sbidoul
2 parents 45ac80a + ca5fba5 commit 7b8906e

13 files changed

Lines changed: 333 additions & 101 deletions

File tree

queue_job/__manifest__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
{
44
"name": "Job Queue",
5-
"version": "17.0.1.1.1",
5+
"version": "17.0.1.2.0",
66
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
77
"website": "https://github.com/OCA/queue",
88
"license": "LGPL-3",

queue_job/controllers/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def _try_perform_job(self, env, job):
3131
job.set_started()
3232
job.store()
3333
env.cr.commit()
34+
job.lock()
35+
3436
_logger.debug("%s started", job)
3537

3638
job.perform()

queue_job/data/queue_data.xml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,6 @@
11
<?xml version="1.0" encoding="utf-8" ?>
22
<odoo>
33
<data noupdate="1">
4-
<record id="ir_cron_queue_job_garbage_collector" model="ir.cron">
5-
<field name="name">Jobs Garbage Collector</field>
6-
<field name="interval_number">5</field>
7-
<field name="interval_type">minutes</field>
8-
<field name="numbercall">-1</field>
9-
<field ref="model_queue_job" name="model_id" />
10-
<field name="state">code</field>
11-
<field name="code">model.requeue_stuck_jobs()</field>
12-
</record>
134
<!-- Queue-job-related subtypes for messaging / Chatter -->
145
<record id="mt_job_failed" model="mail.message.subtype">
156
<field name="name">Job failed</field>

queue_job/job.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,61 @@ def load_many(cls, env, job_uuids):
238238
recordset = cls.db_records_from_uuids(env, job_uuids)
239239
return {cls._load_from_db_record(record) for record in recordset}
240240

241+
def add_lock_record(self):
242+
"""
243+
Create row in db to be locked while the job is being performed.
244+
"""
245+
self.env.cr.execute(
246+
"""
247+
INSERT INTO
248+
queue_job_lock (id, queue_job_id)
249+
SELECT
250+
id, id
251+
FROM
252+
queue_job
253+
WHERE
254+
uuid = %s
255+
ON CONFLICT(id)
256+
DO NOTHING;
257+
""",
258+
[self.uuid],
259+
)
260+
261+
def lock(self):
262+
"""
263+
Lock row of job that is being performed
264+
265+
If a job cannot be locked,
266+
it means that the job wasn't started,
267+
a RetryableJobError is thrown.
268+
"""
269+
self.env.cr.execute(
270+
"""
271+
SELECT
272+
*
273+
FROM
274+
queue_job_lock
275+
WHERE
276+
queue_job_id in (
277+
SELECT
278+
id
279+
FROM
280+
queue_job
281+
WHERE
282+
uuid = %s
283+
AND state='started'
284+
)
285+
FOR UPDATE;
286+
""",
287+
[self.uuid],
288+
)
289+
290+
# 1 job should be locked
291+
if 1 != len(self.env.cr.fetchall()):
292+
raise RetryableJobError(
293+
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
294+
)
295+
241296
@classmethod
242297
def _load_from_db_record(cls, job_db_record):
243298
stored = job_db_record
@@ -819,6 +874,7 @@ def set_started(self):
819874
self.state = STARTED
820875
self.date_started = datetime.now()
821876
self.worker_pid = os.getpid()
877+
self.add_lock_record()
822878

823879
def set_done(self, result=None):
824880
self.state = DONE

queue_job/jobrunner/runner.py

Lines changed: 97 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -114,22 +114,6 @@
114114
* After creating a new database or installing queue_job on an
115115
existing database, Odoo must be restarted for the runner to detect it.
116116
117-
* When Odoo shuts down normally, it waits for running jobs to finish.
118-
However, when the Odoo server crashes or is otherwise force-stopped,
119-
running jobs are interrupted while the runner has no chance to know
120-
they have been aborted. In such situations, jobs may remain in
121-
``started`` or ``enqueued`` state after the Odoo server is halted.
122-
Since the runner has no way to know if they are actually running or
123-
not, and does not know for sure if it is safe to restart the jobs,
124-
it does not attempt to restart them automatically. Such stale jobs
125-
therefore fill the running queue and prevent other jobs to start.
126-
You must therefore requeue them manually, either from the Jobs view,
127-
or by running the following SQL statement *before starting Odoo*:
128-
129-
.. code-block:: sql
130-
131-
update queue_job set state='pending' where state in ('started', 'enqueued')
132-
133117
.. rubric:: Footnotes
134118
135119
.. [1] From a security standpoint, it is safe to have an anonymous HTTP
@@ -155,7 +139,7 @@
155139
from odoo.tools import config
156140

157141
from . import queue_job_config
158-
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager
142+
from .channels import ENQUEUED, NOT_DONE, ChannelManager
159143

160144
SELECT_TIMEOUT = 60
161145
ERROR_RECOVERY_DELAY = 5
@@ -207,35 +191,14 @@ def _connection_info_for(db_name):
207191

208192

209193
def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):
210-
# Method to set failed job (due to timeout, etc) as pending,
211-
# to avoid keeping it as enqueued.
212-
def set_job_pending():
213-
connection_info = _connection_info_for(db_name)
214-
conn = psycopg2.connect(**connection_info)
215-
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
216-
with closing(conn.cursor()) as cr:
217-
cr.execute(
218-
"UPDATE queue_job SET state=%s, "
219-
"date_enqueued=NULL, date_started=NULL "
220-
"WHERE uuid=%s and state=%s "
221-
"RETURNING uuid",
222-
(PENDING, job_uuid, ENQUEUED),
223-
)
224-
if cr.fetchone():
225-
_logger.warning(
226-
"state of job %s was reset from %s to %s",
227-
job_uuid,
228-
ENQUEUED,
229-
PENDING,
230-
)
231-
232194
# TODO: better way to HTTP GET asynchronously (grequest, ...)?
233195
# if this was python3 I would be doing this with
234196
# asyncio, aiohttp and aiopg
235197
def urlopen():
236198
url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format(
237199
scheme, host, port, db_name, job_uuid
238200
)
201+
# pylint: disable=except-pass
239202
try:
240203
auth = None
241204
if user:
@@ -249,10 +212,10 @@ def urlopen():
249212
# for codes between 500 and 600
250213
response.raise_for_status()
251214
except requests.Timeout:
252-
set_job_pending()
215+
# A timeout is a normal behaviour, it shouldn't be logged as an exception
216+
pass
253217
except Exception:
254218
_logger.exception("exception in GET %s", url)
255-
set_job_pending()
256219

257220
thread = threading.Thread(target=urlopen)
258221
thread.daemon = True
@@ -343,6 +306,93 @@ def set_job_enqueued(self, uuid):
343306
(ENQUEUED, uuid),
344307
)
345308

309+
def _query_requeue_dead_jobs(self):
310+
return """
311+
UPDATE
312+
queue_job
313+
SET
314+
state=(
315+
CASE
316+
WHEN
317+
max_retries IS NOT NULL AND
318+
retry IS NOT NULL AND
319+
retry>max_retries
320+
THEN 'failed'
321+
ELSE 'pending'
322+
END),
323+
retry=(
324+
CASE
325+
WHEN state='started'
326+
THEN COALESCE(retry,0)+1 ELSE retry
327+
END),
328+
exc_name=(
329+
CASE
330+
WHEN
331+
max_retries IS NOT NULL AND
332+
retry IS NOT NULL AND
333+
retry>max_retries
334+
THEN 'JobFoundDead'
335+
ELSE exc_name
336+
END),
337+
exc_info=(
338+
CASE
339+
WHEN
340+
max_retries IS NOT NULL AND
341+
retry IS NOT NULL AND
342+
retry>max_retries
343+
THEN 'Job found dead after too many retries'
344+
ELSE exc_info
345+
END)
346+
WHERE
347+
id in (
348+
SELECT
349+
queue_job_id
350+
FROM
351+
queue_job_lock
352+
WHERE
353+
queue_job_id in (
354+
SELECT
355+
id
356+
FROM
357+
queue_job
358+
WHERE
359+
state IN ('enqueued','started')
360+
AND date_enqueued <
361+
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
362+
)
363+
FOR UPDATE SKIP LOCKED
364+
)
365+
RETURNING uuid
366+
"""
367+
368+
def requeue_dead_jobs(self):
369+
"""
370+
Set started and enqueued jobs but not locked to pending
371+
372+
A job is locked when it's being executed
373+
When a job is killed, it releases the lock
374+
375+
If the number of retries exceeds the number of max retries,
376+
the job is set as 'failed' with the error 'JobFoundDead'.
377+
378+
Adding a buffer on 'date_enqueued' to check
379+
that it has been enqueued for more than 10sec.
380+
This prevents from requeuing jobs before they are actually started.
381+
382+
When Odoo shuts down normally, it waits for running jobs to finish.
383+
However, when the Odoo server crashes or is otherwise force-stopped,
384+
running jobs are interrupted while the runner has no chance to know
385+
they have been aborted.
386+
"""
387+
388+
with closing(self.conn.cursor()) as cr:
389+
query = self._query_requeue_dead_jobs()
390+
391+
cr.execute(query)
392+
393+
for (uuid,) in cr.fetchall():
394+
_logger.warning("Re-queued dead job with uuid: %s", uuid)
395+
346396

347397
class QueueJobRunner:
348398
def __init__(
@@ -424,6 +474,11 @@ def initialize_databases(self):
424474
self.channel_manager.notify(db_name, *job_data)
425475
_logger.info("queue job runner ready for db %s", db_name)
426476

477+
def requeue_dead_jobs(self):
478+
for db in self.db_by_name.values():
479+
if db.has_queue_job:
480+
db.requeue_dead_jobs()
481+
427482
def run_jobs(self):
428483
now = _odoo_now()
429484
for job in self.channel_manager.get_jobs_to_run(now):
@@ -516,6 +571,7 @@ def run(self):
516571
_logger.info("database connections ready")
517572
# inner loop does the normal processing
518573
while not self._stop:
574+
self.requeue_dead_jobs()
519575
self.process_notifications()
520576
self.run_jobs()
521577
self.wait_notification()
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
2+
3+
4+
def migrate(cr, version):
5+
# Deactivate cron garbage collector
6+
cr.execute(
7+
"""
8+
UPDATE
9+
ir_cron
10+
SET
11+
active=False
12+
WHERE id IN (
13+
SELECT res_id
14+
FROM
15+
ir_model_data
16+
WHERE
17+
module='queue_job'
18+
AND model='ir.cron'
19+
AND name='ir_cron_queue_job_garbage_collector'
20+
);
21+
"""
22+
)

queue_job/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
from . import queue_job
44
from . import queue_job_channel
55
from . import queue_job_function
6+
from . import queue_job_lock

queue_job/models/queue_job.py

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from datetime import datetime, timedelta
77

88
from odoo import _, api, exceptions, fields, models
9-
from odoo.osv import expression
109
from odoo.tools import config, html_escape
1110

1211
from odoo.addons.base_sparse_field.models.fields import Serialized
@@ -414,55 +413,6 @@ def autovacuum(self):
414413
break
415414
return True
416415

417-
def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0):
418-
"""Fix jobs that are in a bad states
419-
420-
:param in_queue_delta: lookup time in minutes for jobs
421-
that are in enqueued state
422-
423-
:param started_delta: lookup time in minutes for jobs
424-
that are in enqueued state,
425-
0 means that it is not checked
426-
"""
427-
self._get_stuck_jobs_to_requeue(
428-
enqueued_delta=enqueued_delta, started_delta=started_delta
429-
).requeue()
430-
return True
431-
432-
def _get_stuck_jobs_domain(self, queue_dl, started_dl):
433-
domain = []
434-
now = fields.datetime.now()
435-
if queue_dl:
436-
queue_dl = now - timedelta(minutes=queue_dl)
437-
domain.append(
438-
[
439-
"&",
440-
("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)),
441-
("state", "=", "enqueued"),
442-
]
443-
)
444-
if started_dl:
445-
started_dl = now - timedelta(minutes=started_dl)
446-
domain.append(
447-
[
448-
"&",
449-
("date_started", "<=", fields.Datetime.to_string(started_dl)),
450-
("state", "=", "started"),
451-
]
452-
)
453-
if not domain:
454-
raise exceptions.ValidationError(
455-
_("If both parameters are 0, ALL jobs will be requeued!")
456-
)
457-
return expression.OR(domain)
458-
459-
def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta):
460-
job_model = self.env["queue.job"]
461-
stuck_jobs = job_model.search(
462-
self._get_stuck_jobs_domain(enqueued_delta, started_delta)
463-
)
464-
return stuck_jobs
465-
466416
def related_action_open_record(self):
467417
"""Open a form view with the record(s) of the job.
468418

0 commit comments

Comments
 (0)