Skip to content

Commit 852cbe6

Browse files
committed
[IMP] queue: Store context in order to reuse it
1 parent 80ac55d commit 852cbe6

5 files changed

Lines changed: 55 additions & 12 deletions

File tree

queue_job/job.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,15 @@ class DelayableRecordset(object):
5555

5656
def __init__(self, recordset, priority=None, eta=None,
5757
max_retries=None, description=None, channel=None,
58-
identity_key=None):
58+
identity_key=None, job_context=None):
5959
self.recordset = recordset
6060
self.priority = priority
6161
self.eta = eta
6262
self.max_retries = max_retries
6363
self.description = description
6464
self.channel = channel
6565
self.identity_key = identity_key
66+
self.job_context = job_context
6667

6768
def __getattr__(self, name):
6869
if name in self.recordset:
@@ -87,9 +88,18 @@ def delay(*args, **kwargs):
8788
eta=self.eta,
8889
description=self.description,
8990
channel=self.channel,
90-
identity_key=self.identity_key)
91+
identity_key=self.identity_key,
92+
job_context=self.get_context(recordset_method))
9193
return delay
9294

95+
def get_context(self, method):
96+
original_ctx = self.job_context
97+
ctx = {}
98+
for key in getattr(method, 'allow_context', []):
99+
if key in original_ctx:
100+
ctx[key] = original_ctx[key]
101+
return ctx
102+
93103
def __str__(self):
94104
return "DelayableRecordset(%s%s)" % (
95105
self.recordset._name,
@@ -241,6 +251,10 @@ class Job(object):
241251
be added to a channel if the existing job with the same key is not yet
242252
started or executed.
243253
254+
.. attribute::job_context
255+
256+
Original context of the job
257+
244258
"""
245259
@classmethod
246260
def load(cls, env, job_uuid):
@@ -259,7 +273,6 @@ def _load_from_db_record(cls, job_db_record):
259273
args = stored.args
260274
kwargs = stored.kwargs
261275
method_name = stored.method_name
262-
263276
model = env[stored.model_name]
264277

265278
recordset = model.browse(stored.record_ids)
@@ -297,6 +310,7 @@ def _load_from_db_record(cls, job_db_record):
297310
if stored.company_id:
298311
job_.company_id = stored.company_id.id
299312
job_.identity_key = stored.identity_key
313+
job_.job_context = stored.job_context or {}
300314
return job_
301315

302316
def job_record_with_same_identity_key(self):
@@ -311,7 +325,7 @@ def job_record_with_same_identity_key(self):
311325
@classmethod
312326
def enqueue(cls, func, args=None, kwargs=None,
313327
priority=None, eta=None, max_retries=None, description=None,
314-
channel=None, identity_key=None):
328+
channel=None, identity_key=None, job_context=None):
315329
"""Create a Job and enqueue it in the queue. Return the job uuid.
316330
317331
This expects the arguments specific to the job to be already extracted
@@ -324,7 +338,8 @@ def enqueue(cls, func, args=None, kwargs=None,
324338
new_job = cls(func=func, args=args,
325339
kwargs=kwargs, priority=priority, eta=eta,
326340
max_retries=max_retries, description=description,
327-
channel=channel, identity_key=identity_key)
341+
channel=channel, identity_key=identity_key,
342+
job_context=job_context)
328343
if new_job.identity_key:
329344
existing = new_job.job_record_with_same_identity_key()
330345
if existing:
@@ -355,7 +370,8 @@ def db_record_from_uuid(env, job_uuid):
355370
def __init__(self, func,
356371
args=None, kwargs=None, priority=None,
357372
eta=None, job_uuid=None, max_retries=None,
358-
description=None, channel=None, identity_key=None):
373+
description=None, channel=None, identity_key=None,
374+
job_context=None):
359375
""" Create a Job
360376
361377
:param func: function to execute
@@ -397,6 +413,7 @@ def __init__(self, func,
397413

398414
recordset = func.__self__
399415
env = recordset.env
416+
self.job_context = job_context
400417
self.model_name = recordset._name
401418
self.method_name = func.__name__
402419
self.recordset = recordset
@@ -497,6 +514,7 @@ def store(self):
497514
'date_done': False,
498515
'eta': False,
499516
'identity_key': False,
517+
'job_context': self.job_context,
500518
}
501519

502520
dt_to_string = odoo.fields.Datetime.to_string
@@ -539,7 +557,10 @@ def db_record(self):
539557

540558
@property
541559
def func(self):
542-
recordset = self.recordset.with_context(job_uuid=self.uuid)
560+
recordset = self.recordset.with_context(
561+
**(self.job_context or {})
562+
).with_context(job_uuid=self.uuid, )
563+
# We want to be sure that the job_uuid is not rewritten
543564
recordset = recordset.sudo(self.user_id)
544565
return getattr(recordset, self.method_name)
545566

@@ -675,7 +696,8 @@ def _is_model_method(func):
675696
isinstance(func.__self__.__class__, odoo.models.MetaModel))
676697

677698

678-
def job(func=None, default_channel='root', retry_pattern=None):
699+
def job(func=None, default_channel='root', retry_pattern=None,
700+
allow_context=None):
679701
"""Decorator for job methods.
680702
681703
It enables the possibility to use a Model's method as a job function.
@@ -692,6 +714,8 @@ def job(func=None, default_channel='root', retry_pattern=None):
692714
is provided, jobs will be retried after
693715
:const:`RETRY_INTERVAL` seconds.
694716
:type retry_pattern: dict(retry_count,retry_eta_seconds)
717+
:param allow_context: List of allowed context keys.
718+
:type allow_context: array
695719
696720
Indicates that a method of a Model can be delayed in the Job Queue.
697721
@@ -759,7 +783,8 @@ def retryable_example():
759783
"""
760784
if func is None:
761785
return functools.partial(job, default_channel=default_channel,
762-
retry_pattern=retry_pattern)
786+
retry_pattern=retry_pattern,
787+
allow_context=allow_context)
763788

764789
def delay_from_model(*args, **kwargs):
765790
raise AttributeError(
@@ -779,6 +804,7 @@ def delay_from_model(*args, **kwargs):
779804
func.delay = delay_func
780805
func.retry_pattern = retry_pattern
781806
func.default_channel = default_channel
807+
func.allow_context = allow_context or []
782808
return func
783809

784810

queue_job/models/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,5 @@ def with_delay(self, priority=None, eta=None,
9090
max_retries=max_retries,
9191
description=description,
9292
channel=channel,
93-
identity_key=identity_key)
93+
identity_key=identity_key,
94+
job_context=self.env.context.copy())

queue_job/models/queue_job.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class QueueJob(models.Model):
8888
index=True)
8989

9090
identity_key = fields.Char()
91+
job_context = JobSerialized(readonly=True)
9192

9293
@api.model_cr
9394
def init(self):

test_queue_job/models/test_models.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ class TestQueueJob(models.Model):
3636

3737
name = fields.Char()
3838

39-
@job
39+
@job(allow_context=[
40+
'return_context_from_context', 'expected_element_from_context'
41+
])
4042
@related_action(action='testing_related_method')
4143
@api.multi
4244
def testing_method(self, *args, **kwargs):
@@ -48,6 +50,8 @@ def testing_method(self, *args, **kwargs):
4850
raise RetryableJobError('Must be retried later')
4951
if kwargs.get('return_context'):
5052
return self.env.context
53+
if self.env.context.get('return_context_from_context', False):
54+
return self.env.context
5155
return args, kwargs
5256

5357
@job

test_queue_job/tests/test_job.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from datetime import datetime, timedelta
77
import mock
8-
98
from odoo import SUPERUSER_ID
109
import odoo.tests.common as common
1110

@@ -530,6 +529,18 @@ def test_context_uuid(self):
530529
self.assertTrue(key_present)
531530
self.assertEqual(result['job_uuid'], test_job._uuid)
532531

532+
def test_context_from_context(self):
533+
element = 'EXPECTED VALUE'
534+
delayable = self.env['test.queue.job'].with_context(
535+
return_context_from_context=True,
536+
expected_element_from_context=element,
537+
).with_delay()
538+
test_job = delayable.testing_method(return_context=True)
539+
result = test_job.perform()
540+
key_present = 'expected_element_from_context' in result
541+
self.assertTrue(key_present)
542+
self.assertEqual(result['expected_element_from_context'], element)
543+
533544
def test_override_channel(self):
534545
delayable = self.env['test.queue.job'].with_delay(
535546
channel='root.sub.sub')

0 commit comments

Comments
 (0)