Skip to content

Commit 9bcde94

Browse files
committed
Merge PR #668 into 16.0
Signed-off-by sbidoul
2 parents 5c3d65c + 2631808 commit 9bcde94

1 file changed

Lines changed: 35 additions & 6 deletions

File tree

queue_job/jobrunner/runner.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,17 @@
143143

144144
SELECT_TIMEOUT = 60
145145
ERROR_RECOVERY_DELAY = 5
146+
PG_ADVISORY_LOCK_ID = 2293787760715711918
146147

147148
_logger = logging.getLogger(__name__)
148149

149150
select = selectors.DefaultSelector
150151

151152

153+
class MasterElectionLost(Exception):
154+
pass
155+
156+
152157
# Unfortunately, it is not possible to extend the Odoo
153158
# server command line arguments, so we resort to environment variables
154159
# to configure the runner (channels mostly).
@@ -227,10 +232,15 @@ def __init__(self, db_name):
227232
self.db_name = db_name
228233
connection_info = _connection_info_for(db_name)
229234
self.conn = psycopg2.connect(**connection_info)
230-
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
231-
self.has_queue_job = self._has_queue_job()
232-
if self.has_queue_job:
233-
self._initialize()
235+
try:
236+
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
237+
self.has_queue_job = self._has_queue_job()
238+
if self.has_queue_job:
239+
self._acquire_master_lock()
240+
self._initialize()
241+
except BaseException:
242+
self.close()
243+
raise
234244

235245
def close(self):
236246
# pylint: disable=except-pass
@@ -243,6 +253,14 @@ def close(self):
243253
pass
244254
self.conn = None
245255

256+
def _acquire_master_lock(self):
257+
"""Acquire the master runner lock or raise MasterElectionLost"""
258+
with closing(self.conn.cursor()) as cr:
259+
cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,))
260+
if not cr.fetchone()[0]:
261+
msg = f"could not acquire master runner lock on {self.db_name}"
262+
raise MasterElectionLost(msg)
263+
246264
def _has_queue_job(self):
247265
with closing(self.conn.cursor()) as cr:
248266
cr.execute(
@@ -461,14 +479,17 @@ def close_databases(self, remove_jobs=True):
461479
self.db_by_name = {}
462480

463481
def initialize_databases(self):
464-
for db_name in self.get_db_names():
482+
for db_name in sorted(self.get_db_names()):
483+
# sorting is important to avoid deadlocks in acquiring the master lock
465484
db = Database(db_name)
466485
if db.has_queue_job:
467486
self.db_by_name[db_name] = db
468487
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
469488
for job_data in cr:
470489
self.channel_manager.notify(db_name, *job_data)
471490
_logger.info("queue job runner ready for db %s", db_name)
491+
else:
492+
db.close()
472493

473494
def requeue_dead_jobs(self):
474495
for db in self.db_by_name.values():
@@ -560,7 +581,7 @@ def run(self):
560581
while not self._stop:
561582
# outer loop does exception recovery
562583
try:
563-
_logger.info("initializing database connections")
584+
_logger.debug("initializing database connections")
564585
# TODO: how to detect new databases or databases
565586
# on which queue_job is installed after server start?
566587
self.initialize_databases()
@@ -576,6 +597,14 @@ def run(self):
576597
except InterruptedError:
577598
# Interrupted system call, i.e. KeyboardInterrupt during select
578599
self.stop()
600+
except MasterElectionLost as e:
601+
_logger.debug(
602+
"master election lost: %s, sleeping %ds and retrying",
603+
e,
604+
ERROR_RECOVERY_DELAY,
605+
)
606+
self.close_databases()
607+
time.sleep(ERROR_RECOVERY_DELAY)
579608
except Exception:
580609
_logger.exception(
581610
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY

0 commit comments

Comments
 (0)