Skip to content

Commit 867608d

Browse files
committed
[IMP] queue_job_cron_jobrunner: avoid to loop until the end of queue job list
wait the next cron trigger instead to avoid to reach the limit_time_real_cron limit.
1 parent c51f4b4 commit 867608d

5 files changed

Lines changed: 120 additions & 39 deletions

File tree

queue_job_cron_jobrunner/README.rst

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,6 @@ Parallel execution of jobs can be achieved by leveraging multiple ``ir.cron`` re
7878
* Duplicate the ``queue_job_cron`` cron record as many times as needed, until you have
7979
as much records as cron workers.
8080

81-
Known issues / Roadmap
82-
======================
83-
84-
* Support channel capacity and priority. (See ``_acquire_one_job``)
85-
* Gracefully handle CronWorker CPU timeouts. (See ``_job_runner``)
86-
* Commit transaction after job state updated to started. (See ``_process``)
87-
8881
Bug Tracker
8982
===========
9083

queue_job_cron_jobrunner/models/queue_job.py

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import logging
66
import traceback
7-
from datetime import datetime
7+
from datetime import datetime, timedelta
88
from io import StringIO
99

1010
import psutil
@@ -13,6 +13,7 @@
1313
from odoo import _, api, fields, models, tools
1414
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
1515

16+
from odoo.addons.base.models.ir_cron import _intervalTypes
1617
from odoo.addons.queue_job.controllers.main import PG_RETRY
1718
from odoo.addons.queue_job.exception import (
1819
FailedJobError,
@@ -164,17 +165,69 @@ def _job_runner(self, commit=True):
164165
"""Short-lived job runner, triggered by async crons"""
165166
self._release_started_jobs(commit=commit)
166167
job = self._acquire_one_job(commit=commit)
168+
167169
while job:
168170
job._process(commit=commit)
171+
172+
if self._stop_processing():
173+
_logger.info(
174+
"Stop processing queue jobs in this "
175+
"ir.cron call, waiting next ir.cron call.",
176+
)
177+
return
178+
169179
job = self._acquire_one_job(commit=commit)
170-
# TODO: If limit_time_real_cron is reached before all the jobs are done,
171-
# the worker will be killed abruptly.
172-
# Ideally, find a way to know if we're close to reaching this limit,
173-
# stop processing, and trigger a new execution to continue.
174-
#
175-
# if job and limit_time_real_cron_reached_or_about_to_reach:
176-
# self._cron_trigger()
177-
# break
180+
181+
@api.model
182+
def _stop_processing(self):
183+
"""compute what ever the next ir.cron call is going to be
184+
trigger, if yes we stop processing queue job here
185+
186+
One of the goal is to mitigate that, when you have a long list of queue
187+
job to process, the cron thread can be killed
188+
by odoo.sh or odoo with the limit_time_real_cron limit.
189+
190+
We suggest to set ir cron interval lower to the limit_time_real_cron.
191+
"""
192+
# In the current cursor (nor a new cursor) we can't see fresh nextcall which:
193+
# is committed by Odoo at the end of the cron so we assume all crons are running
194+
# so nextcall is the current started date
195+
next_calls = [
196+
cron.nextcall + _intervalTypes[cron.interval_type](cron.interval_number)
197+
for cron in self.env["ir.cron"]
198+
.sudo()
199+
.search([("queue_job_runner", "=", True)])
200+
]
201+
if not next_calls:
202+
_logger.info("Stopping queue job processing, no nextcall found.")
203+
return True
204+
205+
next_cron_job_runner_trigger_date = min(next_calls)
206+
207+
stop_processing_threshold_seconds = int(
208+
self.env["ir.config_parameter"]
209+
.sudo()
210+
.get_param(
211+
"queue_job_cron_jobrunner.stop_processing_threshold_seconds",
212+
"0",
213+
)
214+
)
215+
end_process_queue_job_date = next_cron_job_runner_trigger_date - timedelta(
216+
seconds=stop_processing_threshold_seconds
217+
)
218+
now = fields.Datetime.now()
219+
_logger.debug(
220+
"now: %s - estimated cron nextcall: %s - "
221+
"Threshold: %ss"
222+
"stop processing new job after %s",
223+
now,
224+
next_cron_job_runner_trigger_date,
225+
stop_processing_threshold_seconds,
226+
end_process_queue_job_date,
227+
)
228+
if now >= end_process_queue_job_date:
229+
return True
230+
return False
178231

179232
@api.model
180233
def _cron_trigger(self, at=None):

queue_job_cron_jobrunner/readme/ROADMAP.rst

Lines changed: 0 additions & 3 deletions
This file was deleted.

queue_job_cron_jobrunner/static/description/index.html

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -392,12 +392,11 @@ <h1 class="title">Queue Job Cron Jobrunner</h1>
392392
<div class="contents local topic" id="contents">
393393
<ul class="simple">
394394
<li><a class="reference internal" href="#configuration" id="toc-entry-1">Configuration</a></li>
395-
<li><a class="reference internal" href="#known-issues-roadmap" id="toc-entry-2">Known issues / Roadmap</a></li>
396-
<li><a class="reference internal" href="#bug-tracker" id="toc-entry-3">Bug Tracker</a></li>
397-
<li><a class="reference internal" href="#credits" id="toc-entry-4">Credits</a><ul>
398-
<li><a class="reference internal" href="#authors" id="toc-entry-5">Authors</a></li>
399-
<li><a class="reference internal" href="#contributors" id="toc-entry-6">Contributors</a></li>
400-
<li><a class="reference internal" href="#maintainers" id="toc-entry-7">Maintainers</a></li>
395+
<li><a class="reference internal" href="#bug-tracker" id="toc-entry-2">Bug Tracker</a></li>
396+
<li><a class="reference internal" href="#credits" id="toc-entry-3">Credits</a><ul>
397+
<li><a class="reference internal" href="#authors" id="toc-entry-4">Authors</a></li>
398+
<li><a class="reference internal" href="#contributors" id="toc-entry-5">Contributors</a></li>
399+
<li><a class="reference internal" href="#maintainers" id="toc-entry-6">Maintainers</a></li>
401400
</ul>
402401
</li>
403402
</ul>
@@ -423,32 +422,24 @@ <h1><a class="toc-backref" href="#toc-entry-1">Configuration</a></h1>
423422
as much records as cron workers.</li>
424423
</ul>
425424
</div>
426-
<div class="section" id="known-issues-roadmap">
427-
<h1><a class="toc-backref" href="#toc-entry-2">Known issues / Roadmap</a></h1>
428-
<ul class="simple">
429-
<li>Support channel capacity and priority. (See <tt class="docutils literal">_acquire_one_job</tt>)</li>
430-
<li>Gracefully handle CronWorker CPU timeouts. (See <tt class="docutils literal">_job_runner</tt>)</li>
431-
<li>Commit transaction after job state updated to started. (See <tt class="docutils literal">_process</tt>)</li>
432-
</ul>
433-
</div>
434425
<div class="section" id="bug-tracker">
435-
<h1><a class="toc-backref" href="#toc-entry-3">Bug Tracker</a></h1>
426+
<h1><a class="toc-backref" href="#toc-entry-2">Bug Tracker</a></h1>
436427
<p>Bugs are tracked on <a class="reference external" href="https://github.com/OCA/queue/issues">GitHub Issues</a>.
437428
In case of trouble, please check there if your issue has already been reported.
438429
If you spotted it first, help us to smash it by providing a detailed and welcomed
439430
<a class="reference external" href="https://github.com/OCA/queue/issues/new?body=module:%20queue_job_cron_jobrunner%0Aversion:%2014.0%0A%0A**Steps%20to%20reproduce**%0A-%20...%0A%0A**Current%20behavior**%0A%0A**Expected%20behavior**">feedback</a>.</p>
440431
<p>Do not contact contributors directly about support or help with technical issues.</p>
441432
</div>
442433
<div class="section" id="credits">
443-
<h1><a class="toc-backref" href="#toc-entry-4">Credits</a></h1>
434+
<h1><a class="toc-backref" href="#toc-entry-3">Credits</a></h1>
444435
<div class="section" id="authors">
445-
<h2><a class="toc-backref" href="#toc-entry-5">Authors</a></h2>
436+
<h2><a class="toc-backref" href="#toc-entry-4">Authors</a></h2>
446437
<ul class="simple">
447438
<li>Camptocamp SA</li>
448439
</ul>
449440
</div>
450441
<div class="section" id="contributors">
451-
<h2><a class="toc-backref" href="#toc-entry-6">Contributors</a></h2>
442+
<h2><a class="toc-backref" href="#toc-entry-5">Contributors</a></h2>
452443
<ul>
453444
<li><p class="first"><a class="reference external" href="https://www.camptocamp.com">Camptocamp</a></p>
454445
<blockquote>
@@ -460,7 +451,7 @@ <h2><a class="toc-backref" href="#toc-entry-6">Contributors</a></h2>
460451
</ul>
461452
</div>
462453
<div class="section" id="maintainers">
463-
<h2><a class="toc-backref" href="#toc-entry-7">Maintainers</a></h2>
454+
<h2><a class="toc-backref" href="#toc-entry-6">Maintainers</a></h2>
464455
<p>This module is maintained by the OCA.</p>
465456
<a class="reference external image-reference" href="https://odoo-community.org">
466457
<img alt="Odoo Community Association" src="https://odoo-community.org/logo.png" />

queue_job_cron_jobrunner/tests/test_queue_job.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# @author Iván Todorovich <ivan.todorovich@camptocamp.com>
33
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
44

5-
from datetime import datetime
5+
from datetime import datetime, timedelta
66
from unittest import mock
77

88
from freezegun import freeze_time
@@ -37,12 +37,58 @@ def test_queue_job_process(self):
3737
job3 = self.env["res.partner"].with_delay(eta=3600).create({"name": "Test"})
3838
job3_record = job3.db_record()
3939
# Run the job processing cron
40+
self.cron.nextcall = datetime.now() + timedelta(seconds=600)
4041
self.env["queue.job"]._job_runner(commit=False)
4142
# Check that the jobs were processed
4243
self.assertEqual(job1_record.state, "done", "Processed OK")
4344
self.assertEqual(job2_record.state, "failed", "Has errors")
4445
self.assertEqual(job3_record.state, "pending", "Still pending, because of eta")
4546

47+
def test_stop_processing_job(self):
48+
# nextcall is the theorical start time of the current cron
49+
self.cron.nextcall = datetime(2022, 2, 22, 22, 20, 22)
50+
self.cron.interval_number = 2
51+
self.cron.interval_type = "minutes"
52+
self.env["ir.config_parameter"].set_param(
53+
"queue_job_cron_jobrunner.stop_processing_threshold_seconds", "60"
54+
)
55+
job1 = self.env["res.partner"].with_delay().create({"name": "test"})
56+
job1_record = job1.db_record()
57+
job2 = self.env["res.partner"].with_delay().create({"name": "Test"})
58+
job2_record = job2.db_record()
59+
with freeze_time("2022-02-22 22:21:23"):
60+
# theorical nextcall is 2022-02-22 22:20:22 + 2' => 22:22:22
61+
# no queue job should be started after 22:22:22 - 60" => 22:21:22
62+
self.env["queue.job"]._job_runner(commit=False)
63+
self.assertEqual(job1_record.state, "done", "Processed OK")
64+
self.assertEqual(job2_record.state, "pending", "no time left to start it")
65+
66+
def test_stop_processing_multiple_jobs(self):
67+
cron2 = self.cron.copy()
68+
# why not negative thersholds meaning we stop after time
69+
# is already over for the nextcall
70+
self.env["ir.config_parameter"].set_param(
71+
"queue_job_cron_jobrunner.stop_processing_threshold_seconds", "-10"
72+
)
73+
74+
self.cron.nextcall = datetime(2022, 2, 22, 22, 22, 22)
75+
self.cron.interval_number = 2
76+
self.cron.interval_type = "minutes"
77+
78+
cron2.nextcall = datetime(2022, 2, 22, 22, 21, 22)
79+
cron2.interval_number = 2
80+
cron2.interval_type = "minutes"
81+
82+
# cron1 not after 22:24:32
83+
# cron2 not after 22:23:32
84+
85+
with freeze_time("2022-02-22 22:23:32"):
86+
self.assertTrue(self.env["queue.job"]._stop_processing())
87+
88+
def test_stop_processing_inactive_cron_stop_processing(self):
89+
self.cron.active = False
90+
self.assertTrue(self.env["queue.job"]._stop_processing())
91+
4692
@freeze_time("2022-02-22 22:22:22")
4793
def test_queue_job_cron_trigger_enqueue_dependencies(self):
4894
"""Test that ir.cron execution enqueue waiting dependencies"""
@@ -53,6 +99,7 @@ def test_queue_job_cron_trigger_enqueue_dependencies(self):
5399
job_record = delayable._generated_job.db_record()
54100
job_record_depends = delayable2._generated_job.db_record()
55101

102+
self.cron.nextcall = datetime(2022, 2, 22, 23, 23, 23)
56103
self.env["queue.job"]._job_runner(commit=False)
57104

58105
self.assertEqual(job_record.state, "done", "Processed OK")

0 commit comments

Comments
 (0)