From f28e85e124a71d32f51bcaf997bed52d5eb69867 Mon Sep 17 00:00:00 2001 From: George Waller <54817483+George9Waller@users.noreply.github.com> Date: Fri, 21 Feb 2025 13:28:19 +0100 Subject: [PATCH 1/3] Add exclude_future_jobs arg to get_queue_depths New arg exclude_future_jobs filters the READY and NEW jobs to only be ones without a run_after or the run_after is now/in the past --- django_dbq/models.py | 15 +++++++++++---- django_dbq/tests.py | 25 ++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/django_dbq/models.py b/django_dbq/models.py index b58eef4..a90354f 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -8,7 +8,7 @@ get_failure_hook_name, get_creation_hook_name, ) -from django.db.models import JSONField, UUIDField, Count, TextChoices +from django.db.models import JSONField, UUIDField, Count, TextChoices, Q import datetime import logging import uuid @@ -173,10 +173,17 @@ def run_creation_hook(self): creation_hook_function(self) @staticmethod - def get_queue_depths(): + def get_queue_depths(*, exclude_future_jobs=False): + jobs_waiting_in_queue = Job.objects.filter( + state__in=(Job.STATES.READY, Job.STATES.NEW) + ) + if exclude_future_jobs: + jobs_waiting_in_queue = jobs_waiting_in_queue.filter( + Q(run_after__isnull=True) | Q(run_after__lte=timezone.now()) + ) + annotation_dicts = ( - Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW)) - .values("queue_name") + jobs_waiting_in_queue.values("queue_name") .order_by("queue_name") .annotate(Count("queue_name")) ) diff --git a/django_dbq/tests.py b/django_dbq/tests.py index dd83540..8709eb5 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -71,12 +71,17 @@ def test_worker_with_queue_name(self): self.assertTrue("test_queue" in output) +@freezegun.freeze_time("2025-01-01T12:00:00Z") @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) class JobModelMethodTestCase(TestCase): def test_get_queue_depths(self): Job.objects.create(name="testjob", queue_name="default") Job.objects.create(name="testjob", queue_name="testworker") - Job.objects.create(name="testjob", queue_name="testworker") + Job.objects.create( + name="testjob", + queue_name="testworker", + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) Job.objects.create( name="testjob", queue_name="testworker", state=Job.STATES.FAILED ) @@ -87,6 +92,24 @@ def test_get_queue_depths(self): queue_depths = Job.get_queue_depths() self.assertDictEqual(queue_depths, {"default": 1, "testworker": 2}) + def test_get_queue_depths_exclude_future_jobs(self): + Job.objects.create(name="testjob", queue_name="default") + Job.objects.create(name="testjob", queue_name="testworker") + Job.objects.create( + name="testjob", + queue_name="testworker", + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) + Job.objects.create( + name="testjob", queue_name="testworker", state=Job.STATES.FAILED + ) + Job.objects.create( + name="testjob", queue_name="testworker", state=Job.STATES.COMPLETE + ) + + queue_depths = Job.get_queue_depths(exclude_future_jobs=True) + self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1}) + @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) class QueueDepthTestCase(TestCase): From 738be6db1603c96429023f48d29022c154757481 Mon Sep 17 00:00:00 2001 From: George Waller <54817483+George9Waller@users.noreply.github.com> Date: Fri, 21 Feb 2025 13:29:43 +0100 Subject: [PATCH 2/3] Add the exclude_future_jobs arg to the queue_depth command The queue_depth command can call get_queue_depths with the option to exclude jobs in the future --- django_dbq/management/commands/queue_depth.py | 5 +++- django_dbq/tests.py | 30 +++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/django_dbq/management/commands/queue_depth.py b/django_dbq/management/commands/queue_depth.py index 3419601..cb8b6fd 100644 --- a/django_dbq/management/commands/queue_depth.py +++ b/django_dbq/management/commands/queue_depth.py @@ -8,10 +8,13 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("queue_name", nargs="*", default=["default"], type=str) + parser.add_argument("--exclude_future_jobs", default=False, type=bool) def handle(self, *args, **options): queue_names = options["queue_name"] - queue_depths = Job.get_queue_depths() + queue_depths = Job.get_queue_depths( + exclude_future_jobs=options["exclude_future_jobs"] + ) queue_depths_string = " ".join( [ diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 8709eb5..20c4029 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -111,15 +111,19 @@ def test_get_queue_depths_exclude_future_jobs(self): self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1}) +@freezegun.freeze_time("2025-01-01T12:00:00Z") @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) class QueueDepthTestCase(TestCase): def test_queue_depth(self): - Job.objects.create(name="testjob", state=Job.STATES.FAILED) Job.objects.create(name="testjob", state=Job.STATES.NEW) Job.objects.create(name="testjob", state=Job.STATES.FAILED) Job.objects.create(name="testjob", state=Job.STATES.COMPLETE) - Job.objects.create(name="testjob", state=Job.STATES.READY) + Job.objects.create( + name="testjob", + state=Job.STATES.READY, + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) Job.objects.create( name="testjob", queue_name="testqueue", state=Job.STATES.READY ) @@ -132,6 +136,28 @@ def test_queue_depth(self): output = stdout.getvalue() self.assertEqual(output.strip(), "event=queue_depths default=2") + def test_queue_depth_exclude_future_jobs(self): + Job.objects.create(name="testjob", state=Job.STATES.FAILED) + Job.objects.create(name="testjob", state=Job.STATES.NEW) + Job.objects.create(name="testjob", state=Job.STATES.FAILED) + Job.objects.create(name="testjob", state=Job.STATES.COMPLETE) + Job.objects.create( + name="testjob", + state=Job.STATES.READY, + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) + Job.objects.create( + name="testjob", queue_name="testqueue", state=Job.STATES.READY + ) + Job.objects.create( + name="testjob", queue_name="testqueue", state=Job.STATES.READY + ) + + stdout = StringIO() + call_command("queue_depth", exclude_future_jobs=True, stdout=stdout) + output = stdout.getvalue() + self.assertEqual(output.strip(), "event=queue_depths default=1") + def test_queue_depth_multiple_queues(self): Job.objects.create(name="testjob", state=Job.STATES.FAILED) From 73228276f58dcd4502503a2255a4580828eb0ab2 Mon Sep 17 00:00:00 2001 From: George Waller <54817483+George9Waller@users.noreply.github.com> Date: Fri, 21 Feb 2025 13:30:09 +0100 Subject: [PATCH 3/3] Include new arg in readme documentation --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index c4e698a..04f0caf 100644 --- a/README.md +++ b/README.md @@ -288,6 +288,11 @@ queue_depths = Job.get_queue_depths() print(queue_depths) # {"default": 1, "other_queue": 1} ``` +You can also exclude jobs which exist but are scheduled to be run in the future from the queue depths, where `run_after` is set to a future time from now. To do this set the `exclude_future_jobs` kwarg like so: +```python +queue_depths = Job.get_queue_depths(exclude_future_jobs=True) +``` + **Important:** When checking queue depths, do not assume that the key for your queue will always be available. Queue depths of zero won't be included in the dict returned by this method. @@ -312,6 +317,8 @@ manage.py worker [queue_name] [--rate_limit] If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any jobs in the "NEW" or "READY" states will be returned. +If you wish to exclude jobs which are scheduled to be run in the future you can add `--exclude_future_jobs` to the command. + **Important:** If you misspell or provide a queue name which does not have any jobs, a depth of 0 will always be returned. ### Gotcha: `bulk_create`