33# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
44
55import logging
6+ import random
7+ import time
68import traceback
79from io import StringIO
810
9- from psycopg2 import OperationalError
10- from werkzeug .exceptions import Forbidden
11+ from psycopg2 import OperationalError , errorcodes
12+ from werkzeug .exceptions import BadRequest , Forbidden
1113
1214from odoo import SUPERUSER_ID , _ , api , http , registry , tools
1315from odoo .service .model import PG_CONCURRENCY_ERRORS_TO_RETRY
1416
17+ from ..delay import chain , group
1518from ..exception import FailedJobError , NothingToDoJob , RetryableJobError
1619from ..job import ENQUEUED , Job
1720
1821_logger = logging .getLogger (__name__ )
1922
2023PG_RETRY = 5 # seconds
2124
25+ DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5
26+
2227
2328class RunJobController (http .Controller ):
2429 def _try_perform_job (self , env , job ):
@@ -35,6 +40,35 @@ def _try_perform_job(self, env, job):
3540 env .cr .commit ()
3641 _logger .debug ("%s done" , job )
3742
43+ def _enqueue_dependent_jobs (self , env , job ):
44+ tries = 0
45+ while True :
46+ try :
47+ job .enqueue_waiting ()
48+ except OperationalError as err :
49+ # Automatically retry the typical transaction serialization
50+ # errors
51+ if err .pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY :
52+ raise
53+ if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE :
54+ _logger .info (
55+ "%s, maximum number of tries reached to update dependencies" ,
56+ errorcodes .lookup (err .pgcode ),
57+ )
58+ raise
59+ wait_time = random .uniform (0.0 , 2 ** tries )
60+ tries += 1
61+ _logger .info (
62+ "%s, retry %d/%d in %.04f sec..." ,
63+ errorcodes .lookup (err .pgcode ),
64+ tries ,
65+ DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE ,
66+ wait_time ,
67+ )
68+ time .sleep (wait_time )
69+ else :
70+ break
71+
3872 @http .route ("/queue_job/runjob" , type = "http" , auth = "none" , save_session = False )
3973 def runjob (self , db , job_uuid , ** kw ):
4074 http .request .session .db = db
@@ -111,6 +145,10 @@ def retry_postpone(job, message, seconds=None):
111145 buff .close ()
112146 raise
113147
148+ _logger .debug ("%s enqueue depends started" , job )
149+ self ._enqueue_dependent_jobs (env , job )
150+ _logger .debug ("%s enqueue depends done" , job )
151+
114152 return ""
115153
116154 def _get_failure_values (self , job , traceback_txt , orig_exception ):
@@ -125,13 +163,35 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):
125163 "exc_message" : exc_message ,
126164 }
127165
166+ # flake8: noqa: C901
128167 @http .route ("/queue_job/create_test_job" , type = "http" , auth = "user" )
129168 def create_test_job (
130- self , priority = None , max_retries = None , channel = None , description = "Test job"
169+ self ,
170+ priority = None ,
171+ max_retries = None ,
172+ channel = None ,
173+ description = "Test job" ,
174+ size = 1 ,
175+ failure_rate = 0 ,
131176 ):
132177 if not http .request .env .user .has_group ("base.group_erp_manager" ):
133178 raise Forbidden (_ ("Access Denied" ))
134179
180+ if failure_rate is not None :
181+ try :
182+ failure_rate = float (failure_rate )
183+ except (ValueError , TypeError ):
184+ failure_rate = 0
185+
186+ if not (0 <= failure_rate <= 1 ):
187+ raise BadRequest ("failure_rate must be between 0 and 1" )
188+
189+ if size is not None :
190+ try :
191+ size = int (size )
192+ except (ValueError , TypeError ):
193+ size = 1
194+
135195 if priority is not None :
136196 try :
137197 priority = int (priority )
@@ -144,6 +204,35 @@ def create_test_job(
144204 except ValueError :
145205 max_retries = None
146206
207+ if size == 1 :
208+ return self ._create_single_test_job (
209+ priority = priority ,
210+ max_retries = max_retries ,
211+ channel = channel ,
212+ description = description ,
213+ failure_rate = failure_rate ,
214+ )
215+
216+ if size > 1 :
217+ return self ._create_graph_test_jobs (
218+ size ,
219+ priority = priority ,
220+ max_retries = max_retries ,
221+ channel = channel ,
222+ description = description ,
223+ failure_rate = failure_rate ,
224+ )
225+ return ""
226+
227+ def _create_single_test_job (
228+ self ,
229+ priority = None ,
230+ max_retries = None ,
231+ channel = None ,
232+ description = "Test job" ,
233+ size = 1 ,
234+ failure_rate = 0 ,
235+ ):
147236 delayed = (
148237 http .request .env ["queue.job" ]
149238 .with_delay (
@@ -152,7 +241,56 @@ def create_test_job(
152241 channel = channel ,
153242 description = description ,
154243 )
155- ._test_job ()
244+ ._test_job (failure_rate = failure_rate )
156245 )
246+ return "job uuid: %s" % (delayed .db_record ().uuid ,)
247+
248+ TEST_GRAPH_MAX_PER_GROUP = 5
157249
158- return delayed .db_record ().uuid
250+ def _create_graph_test_jobs (
251+ self ,
252+ size ,
253+ priority = None ,
254+ max_retries = None ,
255+ channel = None ,
256+ description = "Test job" ,
257+ failure_rate = 0 ,
258+ ):
259+ model = http .request .env ["queue.job" ]
260+ current_count = 0
261+
262+ possible_grouping_methods = (chain , group )
263+
264+ tails = [] # we can connect new graph chains/groups to tails
265+ root_delayable = None
266+ while current_count < size :
267+ jobs_count = min (
268+ size - current_count , random .randint (1 , self .TEST_GRAPH_MAX_PER_GROUP )
269+ )
270+
271+ jobs = []
272+ for __ in range (jobs_count ):
273+ current_count += 1
274+ jobs .append (
275+ model .delayable (
276+ priority = priority ,
277+ max_retries = max_retries ,
278+ channel = channel ,
279+ description = "%s #%d" % (description , current_count ),
280+ )._test_job (failure_rate = failure_rate )
281+ )
282+
283+ grouping = random .choice (possible_grouping_methods )
284+ delayable = grouping (* jobs )
285+ if not root_delayable :
286+ root_delayable = delayable
287+ else :
288+ tail_delayable = random .choice (tails )
289+ tail_delayable .on_done (delayable )
290+ tails .append (delayable )
291+
292+ root_delayable .delay ()
293+
294+ return "graph uuid: %s" % (
295+ list (root_delayable ._head ())[0 ]._generated_job .graph_uuid ,
296+ )
0 commit comments