Skip to content

Commit

Permalink
Merge pull request #68 from George9Waller/feature-exclude-future-jobs…
Browse files Browse the repository at this point in the history
…-from-queue-depth

Feature to exclude future jobs from queue depth
  • Loading branch information
j4mie authored Feb 25, 2025
2 parents 86bfc20 + 57ba3cd commit 2fc1b10
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 8 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ queue_depths = Job.get_queue_depths()
print(queue_depths) # {"default": 1, "other_queue": 1}
```

You can also exclude jobs which exist but are scheduled to be run in the future from the queue depths, where `run_after` is set to a future time from now. To do this set the `exclude_future_jobs` kwarg like so:
```python
queue_depths = Job.get_queue_depths(exclude_future_jobs=True)
```

**Important:** When checking queue depths, do not assume that the key for your queue will always be available. Queue depths of zero won't be included
in the dict returned by this method.

Expand All @@ -312,6 +317,8 @@ manage.py worker [queue_name] [--rate_limit]
If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any
jobs in the "NEW" or "READY" states will be returned.

If you wish to exclude jobs which are scheduled to be run in the future you can add `--exclude_future_jobs` to the command.

**Important:** If you misspell or provide a queue name which does not have any jobs, a depth of 0 will always be returned.

### Gotcha: `bulk_create`
Expand Down
5 changes: 4 additions & 1 deletion django_dbq/management/commands/queue_depth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument("queue_name", nargs="*", default=["default"], type=str)
parser.add_argument("--exclude_future_jobs", default=False, type=bool)

def handle(self, *args, **options):
queue_names = options["queue_name"]
queue_depths = Job.get_queue_depths()
queue_depths = Job.get_queue_depths(
exclude_future_jobs=options["exclude_future_jobs"]
)

queue_depths_string = " ".join(
[
Expand Down
15 changes: 11 additions & 4 deletions django_dbq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
get_failure_hook_name,
get_creation_hook_name,
)
from django.db.models import JSONField, UUIDField, Count, TextChoices
from django.db.models import JSONField, UUIDField, Count, TextChoices, Q
import datetime
import logging
import uuid
Expand Down Expand Up @@ -173,10 +173,17 @@ def run_creation_hook(self):
creation_hook_function(self)

@staticmethod
def get_queue_depths():
def get_queue_depths(*, exclude_future_jobs=False):
jobs_waiting_in_queue = Job.objects.filter(
state__in=(Job.STATES.READY, Job.STATES.NEW)
)
if exclude_future_jobs:
jobs_waiting_in_queue = jobs_waiting_in_queue.filter(
Q(run_after__isnull=True) | Q(run_after__lte=timezone.now())
)

annotation_dicts = (
Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW))
.values("queue_name")
jobs_waiting_in_queue.values("queue_name")
.order_by("queue_name")
.annotate(Count("queue_name"))
)
Expand Down
55 changes: 52 additions & 3 deletions django_dbq/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ def test_worker_with_queue_name(self):
self.assertTrue("test_queue" in output)


@freezegun.freeze_time("2025-01-01T12:00:00Z")
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
class JobModelMethodTestCase(TestCase):
def test_get_queue_depths(self):
Job.objects.create(name="testjob", queue_name="default")
Job.objects.create(name="testjob", queue_name="testworker")
Job.objects.create(name="testjob", queue_name="testworker")
Job.objects.create(
name="testjob",
queue_name="testworker",
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
)
Expand All @@ -79,16 +84,38 @@ def test_get_queue_depths(self):
queue_depths = Job.get_queue_depths()
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 2})

def test_get_queue_depths_exclude_future_jobs(self):
Job.objects.create(name="testjob", queue_name="default")
Job.objects.create(name="testjob", queue_name="testworker")
Job.objects.create(
name="testjob",
queue_name="testworker",
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
)
Job.objects.create(
name="testjob", queue_name="testworker", state=Job.STATES.COMPLETE
)

queue_depths = Job.get_queue_depths(exclude_future_jobs=True)
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1})


@freezegun.freeze_time("2025-01-01T12:00:00Z")
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
class QueueDepthTestCase(TestCase):
def test_queue_depth(self):

Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.NEW)
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
Job.objects.create(name="testjob", state=Job.STATES.READY)
Job.objects.create(
name="testjob",
state=Job.STATES.READY,
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)
Expand All @@ -101,6 +128,28 @@ def test_queue_depth(self):
output = stdout.getvalue()
self.assertEqual(output.strip(), "event=queue_depths default=2")

def test_queue_depth_exclude_future_jobs(self):
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.NEW)
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
Job.objects.create(
name="testjob",
state=Job.STATES.READY,
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)

stdout = StringIO()
call_command("queue_depth", exclude_future_jobs=True, stdout=stdout)
output = stdout.getvalue()
self.assertEqual(output.strip(), "event=queue_depths default=1")

def test_queue_depth_multiple_queues(self):

Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Expand Down

0 comments on commit 2fc1b10

Please sign in to comment.