Skip to content

Commit 5aad591

Browse files
author
Guewen Baconnier
committed
Support multi-nodes with lock on jobrunner
Starting several odoo (main) processes with "--load=web,queue_job" was unsupported, as it would start several job runner, which would all listen to postgresql notifications and try to enqueue jobs in concurrent workers. This is an issue in several cases: * it causes issues on odoo.sh that uses an hybrid model for workers and starts several job runners [0] * it defeats any setup that would use several nodes to keep the service available in case of failure of a node/host The solution implemented here is using a PostgreSQL advisory lock, at session level in a connection on the "postgres" database, which ensure 2 job runners are not working on the same set of databases. At loading, the job runner tries to acquire the lock. If it can, it initializes the connection and listen for jobs. If the lock is taken by another job runner, it waits and retry to acquire it every 30 seconds. Example when a job runner is started and another one starts: INFO ? odoo.addons.queue_job.jobrunner.runner: starting INFO ? odoo.addons.queue_job.jobrunner.runner: already started on another node The shared lock identifier is computed based on the set of databases the job runner has to listen to: if a job runner is started with ``--database=queue1`` and another with ``--database=queue2``, they will have different locks and such will be able to work in parallel. Important: new databases need a restart of the job runner. This was already the case, and would be a great improvement, but is out of scope for this improvement. [0] #169 (comment)
1 parent a089cb1 commit 5aad591

1 file changed

Lines changed: 129 additions & 5 deletions

File tree

queue_job/jobrunner/runner.py

Lines changed: 129 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,35 @@
1818
* It does not run jobs itself, but asks Odoo to run them through an
1919
anonymous ``/queue_job/runjob`` HTTP request. [1]_
2020
21+
How does concurrent job runners work?
22+
-------------------------------------
23+
24+
If several nodes (on different hosts or not) of job runners are started,
25+
a shared lock ensures that only one job runner works on a database at
26+
a time. These rules are to take in consideration:
27+
28+
* The identifier of the shared lock is based on the database list provided,
29+
so either ``--database``/``db_name`` or all the databases in PostgreSQL.
30+
* When 2 job runners with the exact same list of databases are started,
31+
only the first one will work. The second one will wait and take over
32+
if the first one is stopped.
33+
34+
Caveats:
35+
36+
* If 2 job runners have a database in common but a different list (e.g.
37+
``db_name=project1,project2`` and ``db_name=project2,project3``), both job
38+
runners will work and listen to ``project2``, which will lead to unexpected
39+
behavior.
40+
* The same applies when no database is specified and all the cluster's databases
41+
are used. If a job runner is started on the cluster's databases, a new database
42+
is created and a second job runner is started, they'll both work on a same set
43+
of databases with unexpected behaviors.
44+
* PostgreSQL advisory locks are based on a integer, the list of database names
45+
is sorted, hashed and converted to an int64, so we lose information in the
46+
identifier. A low risk of collision is possible. If it happens some day, we
47+
should add an option for a custom lock identifier.
48+
49+
2150
How to use it?
2251
--------------
2352
@@ -134,6 +163,7 @@
134163
"""
135164

136165
import datetime
166+
import hashlib
137167
import logging
138168
import os
139169
import select
@@ -152,6 +182,8 @@
152182
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager
153183

154184
SELECT_TIMEOUT = 60
185+
TRY_ACQUIRE_INTERVAL = 30 # seconds
186+
SHARED_LOCK_KEEP_ALIVE = 60 # seconds
155187
ERROR_RECOVERY_DELAY = 5
156188

157189
_logger = logging.getLogger(__name__)
@@ -251,7 +283,51 @@ def urlopen():
251283
thread.start()
252284

253285

254-
class Database(object):
286+
class SharedLockDatabase(object):
287+
def __init__(self, db_name, lock_name):
288+
self.db_name = db_name
289+
self.lock_ident = self.name_to_int64(lock_name)
290+
connection_info = _connection_info_for(db_name)
291+
self.conn = psycopg2.connect(**connection_info)
292+
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
293+
self.acquired = False
294+
self.try_acquire()
295+
296+
@staticmethod
297+
def name_to_int64(lock_name):
298+
hasher = hashlib.sha256()
299+
hasher.update(lock_name.encode("utf-8"))
300+
# pg_try_advisory_lock is limited to an 8-byte (64bit) signed integer
301+
return int.from_bytes(hasher.digest()[:8], byteorder="big", signed=True)
302+
303+
def try_acquire(self):
304+
self.acquired = self._acquire()
305+
306+
def _acquire(self):
307+
with closing(self.conn.cursor()) as cr:
308+
# session level lock
309+
cr.execute("SELECT pg_try_advisory_lock(%s);", (self.lock_ident,))
310+
acquired = cr.fetchone()[0]
311+
return acquired
312+
313+
def keep_alive(self):
314+
query = "SELECT 1"
315+
with closing(self.conn.cursor()) as cr:
316+
cr.execute(query)
317+
318+
def close(self):
319+
# pylint: disable=except-pass
320+
# if close fail for any reason, it's either because it's already closed
321+
# and we don't care, or for any reason but anyway it will be closed on
322+
# del
323+
try:
324+
self.conn.close()
325+
except Exception:
326+
pass
327+
self.conn = None
328+
329+
330+
class QueueDatabase(object):
255331
def __init__(self, db_name):
256332
self.db_name = db_name
257333
connection_info = _connection_info_for(db_name)
@@ -339,6 +415,11 @@ def __init__(
339415
if channel_config_string is None:
340416
channel_config_string = _channels()
341417
self.channel_manager.simple_configure(channel_config_string)
418+
419+
self.shared_lock_db = None
420+
# TODO: how to detect new databases or databases
421+
# on which queue_job is installed after server start?
422+
self.list_db_names = self.get_db_names()
342423
self.db_by_name = {}
343424
self._stop = False
344425
self._stop_pipe = os.pipe()
@@ -388,11 +469,22 @@ def close_databases(self, remove_jobs=True):
388469
db.close()
389470
except Exception:
390471
_logger.warning("error closing database %s", db_name, exc_info=True)
472+
391473
self.db_by_name = {}
392474

475+
if self.shared_lock_db:
476+
try:
477+
self.shared_lock_db.close()
478+
except Exception:
479+
_logger.warning(
480+
"error closing database %s",
481+
self.shared_lock_db.db_name,
482+
exc_info=True,
483+
)
484+
393485
def initialize_databases(self):
394-
for db_name in self.get_db_names():
395-
db = Database(db_name)
486+
for db_name in self.list_db_names:
487+
db = QueueDatabase(db_name)
396488
if not db.has_queue_job:
397489
_logger.debug("queue_job is not installed for db %s", db_name)
398490
else:
@@ -470,6 +562,12 @@ def wait_notification(self):
470562
for conn in conns:
471563
conn.poll()
472564

565+
def keep_alive_shared_lock(self):
566+
self.shared_lock_db.keep_alive()
567+
568+
def _lock_ident(self):
569+
return "qj:{}".format("-".join(sorted(self.list_db_names)))
570+
473571
def stop(self):
474572
_logger.info("graceful stop requested")
475573
self._stop = True
@@ -481,16 +579,42 @@ def run(self):
481579
while not self._stop:
482580
# outer loop does exception recovery
483581
try:
582+
# When concurrent jobrunners are started, the first to win the
583+
# race acquires an advisory lock on PostgreSQL and gets to
584+
# work. When a jobrunner is stopped, the lock is released, and
585+
# another node can take over.
586+
self.shared_lock_db = SharedLockDatabase("postgres", self._lock_ident())
587+
if not self.shared_lock_db.acquired:
588+
self.close_databases()
589+
_logger.info("already started on another node")
590+
# no database to work with... retry later in case a concurrent
591+
# node is stopped
592+
time.sleep(TRY_ACQUIRE_INTERVAL)
593+
continue
594+
484595
_logger.info("initializing database connections")
485-
# TODO: how to detect new databases or databases
486-
# on which queue_job is installed after server start?
487596
self.initialize_databases()
488597
_logger.info("database connections ready")
598+
599+
last_keep_alive = None
600+
489601
# inner loop does the normal processing
490602
while not self._stop:
491603
self.process_notifications()
492604
self.run_jobs()
493605
self.wait_notification()
606+
if (
607+
not last_keep_alive
608+
or time.time() >= last_keep_alive + SHARED_LOCK_KEEP_ALIVE
609+
):
610+
last_keep_alive = time.time()
611+
# send a keepalive on the shared lock connection at
612+
# most every 60 seconds
613+
self.keep_alive_shared_lock()
614+
# TODO here, when we have no "db_name", we could list again
615+
# the databases and if the list changed, try to acquire a new
616+
# lock
617+
494618
except KeyboardInterrupt:
495619
self.stop()
496620
except InterruptedError:

0 commit comments

Comments
 (0)