From 69e111695e92fe02e8edd298745f9546280e6430 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:52:18 +0100 Subject: [PATCH 1/2] [IMP] queue_job: let _acquire_job obtain any job in ENQUEUED state --- queue_job/controllers/main.py | 38 +++++++++++++----------- test_queue_job/tests/test_acquire_job.py | 36 ++++++++++++++-------- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 28f3534848..826d754fe8 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -28,7 +28,9 @@ class RunJobController(http.Controller): @classmethod - def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + def _acquire_job( + cls, env: api.Environment, job_uuid: str | None = None + ) -> Job | None: """Acquire a job for execution. - make sure it is in ENQUEUED state @@ -38,30 +40,32 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: If successful, return the Job instance, otherwise return None. This function may fail to acquire the job is not in the expected state or is already locked by another worker. + + If no job_uuid is given, acquire any available job in ENQUEUED state. """ - env.cr.execute( - "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " - "FOR UPDATE SKIP LOCKED", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s, or is being handled by another worker", - job_uuid, - ENQUEUED, + if job_uuid: + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), ) + else: + env.cr.execute( + "SELECT uuid FROM queue_job WHERE state=%s LIMIT 1 " + "FOR UPDATE SKIP LOCKED", + (ENQUEUED,), + ) + job_row = env.cr.fetchone() + if not job_row: + _logger.debug("no job to run") return None - job = Job.load(env, job_uuid) + job = Job.load(env, job_uuid=job_row[0]) assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() if not job.lock(): - _logger.warning( - "was requested to run job %s, but it could not be locked", - job_uuid, - ) + _logger.debug("could not acquire lock for job %s", job.uuid) return None return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py index 3f0c92a2be..534f8e3a1a 100644 --- a/test_queue_job/tests/test_acquire_job.py +++ b/test_queue_job/tests/test_acquire_job.py @@ -1,11 +1,11 @@ # Copyright 2026 ACSONE SA/NV # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). -import logging from unittest import mock from odoo.tests import tagged from odoo.addons.queue_job.controllers.main import RunJobController +from odoo.addons.queue_job.job import ENQUEUED, STARTED from .common import JobCommonCase @@ -27,7 +27,7 @@ def test_acquire_enqueued_job(self): mock_commit.assert_called_once() self.assertIsNotNone(job) self.assertEqual(job.uuid, "test_enqueued_job") - self.assertEqual(job.state, "started") + self.assertEqual(job.state, STARTED) self.assertTrue( self.env["queue.job.lock"].search( [("queue_job_id", "=", job_record.id)] @@ -35,17 +35,29 @@ def test_acquire_enqueued_job(self): "A job lock record should exist at this point", ) + def test_acquire_any_enqueued_job(self): + available_job_uuids = ( + self.env["queue.job"].search([("state", "=", ENQUEUED)]).mapped("uuid") + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env) + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertIn(job.uuid, available_job_uuids) + self.assertEqual(job.state, STARTED) + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job.db_record().id)] + ), + "A job lock record should exist at this point", + ) + def test_acquire_started_job(self): - with ( - mock.patch.object( - self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) - ) as mock_commit, - self.assertLogs(level=logging.WARNING) as logs, - ): + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: job = RunJobController._acquire_job(self.env, "test_started_job") mock_commit.assert_not_called() self.assertIsNone(job) - self.assertIn( - "was requested to run job test_started_job, but it does not exist", - logs.output[0], - ) From f6108a75d5b74d0d68c17aa77dddde9aa0e63e81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 12:29:36 +0100 Subject: [PATCH 2/2] [IMP] queue_job: run jobs in cron workers --- queue_job/__manifest__.py | 1 + queue_job/controllers/main.py | 24 +++++++-- queue_job/data/queue_job_executor_cron.xml | 11 ++++ queue_job/models/__init__.py | 1 + queue_job/models/queue_job_executor.py | 60 ++++++++++++++++++++++ 5 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 queue_job/data/queue_job_executor_cron.xml create mode 100644 queue_job/models/queue_job_executor.py diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 978356cfd7..823e370a7c 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -20,6 +20,7 @@ "wizards/queue_requeue_job_views.xml", "views/queue_job_menus.xml", "data/queue_data.xml", + "data/queue_job_executor_cron.xml", "data/queue_job_function_data.xml", ], "assets": { diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 826d754fe8..17bad056b4 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -161,6 +161,7 @@ def retry_postpone(job, message, seconds=None): _logger.debug("%s enqueue depends started", job) cls._enqueue_dependent_jobs(env, job) + env.cr.commit() _logger.debug("%s enqueue depends done", job) @classmethod @@ -185,13 +186,26 @@ def _get_failure_values(cls, job, traceback_txt, orig_exception): save_session=False, readonly=False, ) - def runjob(self, db, job_uuid, **kw): + def runjob(self, db: str, job_uuid: str | None, **kw): http.request.session.db = db env = http.request.env(user=SUPERUSER_ID) - job = self._acquire_job(env, job_uuid) - if not job: - return "" - self._runjob(env, job) + run_as = env["ir.config_parameter"].get_param("queue_job.run_as") + if run_as == "cron": + crons = env["ir.cron"].search( + env["queue.job.executor"]._executor_cron_domain() + ) + assert crons, "No queue_job executor cron found" + for cron in crons: + # TODO Awaking all of them is a bit wasteful although not very + # costly. Ideally we should awaken only one that is not already + # running. + cron._trigger() + else: + # Run in this http worker + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) return "" # flake8: noqa: C901 diff --git a/queue_job/data/queue_job_executor_cron.xml b/queue_job/data/queue_job_executor_cron.xml new file mode 100644 index 0000000000..5439b6f051 --- /dev/null +++ b/queue_job/data/queue_job_executor_cron.xml @@ -0,0 +1,11 @@ + + + + Queue Job Executor + + code + model._execute_ready_jobs() + 1 + hours + + diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 6265dfe9cb..781236b064 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -2,5 +2,6 @@ from . import ir_model_fields from . import queue_job from . import queue_job_channel +from . import queue_job_executor from . import queue_job_function from . import queue_job_lock diff --git a/queue_job/models/queue_job_executor.py b/queue_job/models/queue_job_executor.py new file mode 100644 index 0000000000..9e7923558c --- /dev/null +++ b/queue_job/models/queue_job_executor.py @@ -0,0 +1,60 @@ +# Copyright (c) 2026 ACSONE SA/NV () +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +from odoo import api, models + +from ..controllers.main import RunJobController +from ..job import Job + +_logger = logging.getLogger(__name__) + + +class QueueJobExecutor(models.AbstractModel): + _name = "queue.job.executor" + _description = "Queue Job Executor" + + @api.model + def _executor_cron_domain(self) -> list: + model_id = self.env["ir.model"]._get("queue.job.executor").id + return [ + ("model_id", "=", model_id), + ("state", "=", "code"), + ("code", "=", "model._execute_ready_jobs()"), + ] + + @api.model + def _ensure_executor_crons(self, capacity: int) -> None: + """Since Odoo cron can't run cron jobs in parallel, we create several. + + `capacity` should be equal to the root channel capacity. If it's more, + it's wasteful. If it's less, job will stay in ENQUEUED state longer than + needed and loop back to PENDING due to the dead jobs requeuer. + """ + if capacity < 1: + return + ref_cron = self.env.ref("queue_job.queue_job_executor_cron") + ref_cron.active = True + # remove clones + self.env["ir.cron"].with_context(active_test=False).search( + self._executor_cron_domain() + [("id", "!=", ref_cron.id)] + ).unlink() + # re-create desired number of clones + for _i in range(1, capacity): + ref_cron.copy() + + @api.model + def _enable_executor_cron(self, capacity: int) -> None: + self._ensure_executor_crons(capacity) + self.env["ir.config_parameter"].set_param("queue_job.run_as", "cron") + + @api.model + def _execute_job(self, job: Job) -> None: + RunJobController._runjob(self.env, job) + + @api.model + def _execute_ready_jobs(self) -> None: + while job := RunJobController._acquire_job(self.env): + _logger.debug("executor cron running queue job %s", job.uuid) + self._execute_job(job)