Skip to content

Commit defd5d9

Browse files
[ADD] queue_job_cron_jobrunner
1 parent 5fe8275 commit defd5d9

14 files changed

Lines changed: 309 additions & 0 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
TO BE GENERATED AUTOMATICALLY
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from . import models
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "Queue Job Cron Jobrunner",
3+
"summary": "Run jobs without a dedicated JobRunner",
4+
"version": "15.0.1.0.0",
5+
"author": "Camptocamp SA, Odoo Community Association (OCA)",
6+
"maintainers": ["ivantodorovich"],
7+
"website": "https://github.com/OCA/queue",
8+
"license": "AGPL-3",
9+
"category": "Others",
10+
"depends": ["queue_job"],
11+
"data": ["data/ir_cron.xml"],
12+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<odoo noupdate="1">
3+
4+
<record id="queue_job_cron" model="ir.cron">
5+
<field name="name">Queue Job Runner</field>
6+
<field name="model_id" ref="queue_job.model_queue_job" />
7+
<field name="state">code</field>
8+
<field name="code">model._job_runner()</field>
9+
<field name="user_id" ref="base.user_root" />
10+
<field name="interval_number">1</field>
11+
<field name="interval_type">days</field>
12+
<field name="numbercall">-1</field>
13+
</record>
14+
15+
</odoo>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from . import queue_job
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Copyright 2022 Camptocamp SA (https://www.camptocamp.com).
2+
# @author Iván Todorovich <ivan.todorovich@camptocamp.com>
3+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
4+
5+
import logging
6+
import traceback
7+
from io import StringIO
8+
9+
from psycopg2 import OperationalError
10+
11+
from odoo import _, api, models, tools
12+
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
13+
14+
from odoo.addons.queue_job.controllers.main import PG_RETRY
15+
from odoo.addons.queue_job.exception import (
16+
FailedJobError,
17+
NothingToDoJob,
18+
RetryableJobError,
19+
)
20+
from odoo.addons.queue_job.job import Job
21+
22+
_logger = logging.getLogger(__name__)
23+
24+
25+
class QueueJob(models.Model):
26+
_inherit = "queue.job"
27+
28+
@api.model
29+
def _acquire_one_job(self):
30+
"""Acquire the next job to be run.
31+
32+
:returns: queue.job record (locked for update)
33+
"""
34+
# TODO: This method should respect channel priority and capacity,
35+
# rather than just fetching them by creation date.
36+
self.flush()
37+
self.env.cr.execute(
38+
"""
39+
SELECT *
40+
FROM queue_job
41+
WHERE state = 'pending'
42+
AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC'))
43+
ORDER BY date_created DESC
44+
LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
45+
"""
46+
)
47+
row = self.env.cr.dictfetchone()
48+
return self.browse(row and row["id"])
49+
50+
def _process(self, commit=False):
51+
"""Process the job"""
52+
self.ensure_one()
53+
job = Job._load_from_db_record(self)
54+
# Set it as started
55+
job.set_started()
56+
job.store()
57+
if _logger.isEnabledFor(logging.DEBUG):
58+
_logger.debug("%s started", job.uuid)
59+
# TODO: Commit the state change so that the state can be read from the UI
60+
# while the job is processing. However, doing this will release the
61+
# lock on the db, so we need to find another way.
62+
# if commit:
63+
# self.flush()
64+
# self.env.cr.commit()
65+
66+
# Actual processing
67+
try:
68+
try:
69+
with self.env.cr.savepoint():
70+
job.perform()
71+
job.set_done()
72+
job.store()
73+
except OperationalError as err:
74+
# Automatically retry the typical transaction serialization errors
75+
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
76+
raise
77+
message = tools.ustr(err.pgerror, errors="replace")
78+
job.postpone(result=message, seconds=PG_RETRY)
79+
job.set_pending(reset_retry=False)
80+
job.store()
81+
if _logger.isEnabledFor(logging.DEBUG):
82+
_logger.debug("%s OperationalError, postponed", job)
83+
84+
except NothingToDoJob as err:
85+
if str(err):
86+
msg = str(err)
87+
else:
88+
msg = _("Job interrupted and set to Done: nothing to do.")
89+
job.set_done(msg)
90+
job.store()
91+
92+
except RetryableJobError as err:
93+
# delay the job later, requeue
94+
job.postpone(result=str(err), seconds=5)
95+
job.set_pending(reset_retry=False)
96+
job.store()
97+
if _logger.isEnabledFor(logging.DEBUG):
98+
_logger.debug("%s postponed", job)
99+
100+
except (FailedJobError, Exception):
101+
buff = StringIO()
102+
traceback.print_exc(file=buff)
103+
_logger.error(buff.getvalue())
104+
job.set_failed(exc_info=buff.getvalue())
105+
job.store()
106+
107+
if commit:
108+
self.env["base"].flush()
109+
self.env.cr.commit() # pylint: disable=invalid-commit
110+
111+
@api.model
112+
def _job_runner(self, commit=True):
113+
"""Short-lived job runner, triggered by async crons"""
114+
job = self._acquire_one_job()
115+
while job:
116+
job._process(commit=commit)
117+
job = self._acquire_one_job()
118+
# TODO: If limit_time_real_cron is reached before all the jobs are done,
119+
# the worker will be killed abruptly.
120+
# Ideally, find a way to know if we're close to reaching this limit,
121+
# stop processing, and trigger a new execution to continue.
122+
#
123+
# if job and limit_time_real_cron_reached_or_about_to_reach:
124+
# self._cron_trigger()
125+
# break
126+
127+
@api.model
128+
def _cron_trigger(self, at=None):
129+
"""Trigger the cron job runners
130+
131+
Odoo will prevent concurrent cron jobs from running.
132+
So, to eventually support parallel execution, we'd need to have (at least) the
133+
same number of ir.crons records as cron workers.
134+
135+
All crons should be triggered at the same time.
136+
"""
137+
crons = (
138+
self.env["ir.cron"]
139+
.sudo()
140+
.search(
141+
[
142+
("model_id.model", "=", "queue.job"),
143+
("code", "=", "model._job_runner()"),
144+
]
145+
)
146+
)
147+
for cron in crons:
148+
cron._trigger(at=at)
149+
150+
def _ensure_cron_trigger(self):
151+
"""Create cron triggers for these jobs"""
152+
records = self.filtered(lambda r: r.state == "pending")
153+
if not records:
154+
return
155+
# Trigger immediate runs
156+
immediate = any(not rec.eta for rec in records)
157+
if immediate:
158+
self._cron_trigger()
159+
# Trigger delayed eta runs
160+
delayed_etas = {rec.eta for rec in records if rec.eta}
161+
if delayed_etas:
162+
self._cron_trigger(at=list(delayed_etas))
163+
164+
@api.model_create_multi
165+
def create(self, vals_list):
166+
# When jobs are created, also create the cron trigger
167+
records = super().create(vals_list)
168+
records._ensure_cron_trigger()
169+
return records
170+
171+
def write(self, vals):
172+
# When a job state or eta changes, make sure a cron trigger is created
173+
res = super().write(vals)
174+
if "state" in vals or "eta" in vals:
175+
self._ensure_cron_trigger()
176+
return res
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
.. warning::
2+
3+
Don't use this module if you're already running the regular ``queue_job`` runner.
4+
5+
6+
For the easiest case, no configuration is required besides installing the module.
7+
8+
To avoid CronWorker CPU timeout from abruptly stopping the job processing cron, it's
9+
recommended to launch Odoo with ``--limit-time-real-cron=0``, to disable the CronWorker
10+
timeout altogether.
11+
12+
.. note::
13+
14+
In Odoo.sh, this is done by default.
15+
16+
17+
Parallel execution of jobs can be achieved by leveraging multiple ``ir.cron`` records:
18+
19+
* Make sure you have enough CronWorkers available (Odoo CLI ``--max-cron-threads``)
20+
* Duplicate the ``queue_job_cron`` cron record as many times as needed, until you have
21+
as much records as cron workers.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
* `Camptocamp <https://www.camptocamp.com>`_
2+
3+
* Iván Todorovich <ivan.todorovich@camptocamp.com>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
This module implements a simple ``queue.job`` runner using ``ir.cron`` triggers.
2+
3+
It's meant to be used on environments where the regular job runner can't be run, like
4+
on Odoo.sh.
5+
6+
Unlike the regular job runner, where jobs are dispatched to the HttpWorkers, jobs are
7+
processed on the CronWorker threads by the job runner crons. This is a design decision
8+
because:
9+
10+
* Odoo.sh puts HttpWorkers to sleep when there's no network activity
11+
* HttpWorkers are meant for traffic. Users shouldn't pay the price of background tasks.
12+
13+
For now, it only implements the most basic features of the ``queue_job`` runner, notably
14+
no channel capacity nor priorities. Please check the ROADMAP for further details.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
* Support channel capacity and priority. (See ``_acquire_one_job``)
2+
* Gracefully handle CronWorker CPU timeouts. (See ``_job_runner``)
3+
* Commit transaction after job state updated to started. (See ``_process``)

0 commit comments

Comments
 (0)