55import random
66from datetime import datetime , timedelta
77
8+ import psutil
9+
810from odoo import _ , api , exceptions , fields , models
911from odoo .osv import expression
1012from odoo .tools import config , html_escape
@@ -417,6 +419,37 @@ def autovacuum(self):
417419 break
418420 return True
419421
422+ def _check_job_worker_pid (self ):
423+ """
424+ Checking that job's worker pids still exist
425+ If not, it means that the worker has been killed
426+ """
427+ jobs = self .env ["queue.job" ].search (
428+ [
429+ ("state" , "=" , "started" ),
430+ ("worker_pid" , "!=" , False ),
431+ ]
432+ )
433+
434+ failed_jobs = self .env ["queue.job" ].browse ()
435+
436+ for job in jobs :
437+ if not psutil .pid_exists (job .worker_pid ):
438+ _logger .info ("a process with pid %d does not exist" % job .worker_pid )
439+ failed_jobs += job
440+ if failed_jobs :
441+ for job in failed_jobs :
442+ job = Job .load (job .env , job .uuid )
443+ job .set_failed (
444+ exc_name = _ ("WorkerError" ),
445+ exc_info = _ (
446+ "The worker executing the job was killed."
447+ "This is likely to be due to a timeout"
448+ ),
449+ exc_message = _ ("Associated worker was killed" ),
450+ )
451+ job .store ()
452+
420453 def requeue_stuck_jobs (self , enqueued_delta = 1 , started_delta = 0 ):
421454 """Fix jobs that are in a bad states
422455
@@ -431,6 +464,9 @@ def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
431464 """
432465 if started_delta == - 1 :
433466 started_delta = (config ["limit_time_real" ] // 60 ) + 1
467+
468+ self ._check_job_worker_pid ()
469+
434470 return self ._get_stuck_jobs_to_requeue (
435471 enqueued_delta = enqueued_delta , started_delta = started_delta
436472 ).requeue ()
0 commit comments