diff --git a/django_dramatiq/admin.py b/django_dramatiq/admin.py index 4ded8e8..57d5671 100644 --- a/django_dramatiq/admin.py +++ b/django_dramatiq/admin.py @@ -1,60 +1,18 @@ -import abc import json from datetime import datetime from django.conf import settings from django.contrib import admin -from django.contrib.admin import SimpleListFilter from django.utils import timezone from django.utils.safestring import mark_safe -from dramatiq import Message from .models import Task -class TaskFilter(abc.ABC, SimpleListFilter): - def lookups(self, request, model_admin): - lookup_choices = set() - - messages = ( - Message.decode(bytes(encoded_message)) - for encoded_message in model_admin.model.tasks.values_list( - "message_data", flat=True - ) - ) - - for message in messages: - choice = getattr(message, self.parameter_name) - lookup_choices.add((choice, "{} (slow)".format(choice))) - - return sorted(lookup_choices) - - def queryset(self, request, queryset): - if self.value(): - filter_ids = ( - task.id - for task in queryset - if getattr(task.message, self.parameter_name) == self.value() - ) - return queryset.filter(id__in=filter_ids) - - return queryset - - -class QueueNameFilter(TaskFilter): - title = "queue_name" - parameter_name = "queue_name" - - -class ActorNameFilter(TaskFilter): - title = "actor_name" - parameter_name = "actor_name" - - @admin.register(Task) class TaskAdmin(admin.ModelAdmin): exclude = ("message_data",) - readonly_fields = ("message_details", "traceback", "status") + readonly_fields = ("message_details", "traceback", "status", "queue_name", "actor_name") list_display = ( "__str__", "status", @@ -64,17 +22,12 @@ class TaskAdmin(admin.ModelAdmin): "queue_name", "actor_name", ) - list_filter = ("status", "created_at", QueueNameFilter, ActorNameFilter) - - def queue_name(self, instance): - return instance.message.queue_name - - def actor_name(self, instance): - return instance.message.actor_name + list_filter = ("status", "created_at", "queue_name", "actor_name") + search_fields = ("actor_name",) def eta(self, instance): timestamp = ( - instance.message.options.get("eta", instance.message.message_timestamp) / 1000 + instance.message.options.get("eta", instance.message.message_timestamp) / 1000 ) # Django expects a timezone-aware datetime if USE_TZ is True, and a naive datetime in localtime otherwise. diff --git a/django_dramatiq/middleware.py b/django_dramatiq/middleware.py index 0a88ab3..c8f73b7 100644 --- a/django_dramatiq/middleware.py +++ b/django_dramatiq/middleware.py @@ -18,13 +18,13 @@ def after_enqueue(self, broker, message, delay): if delay: status = Task.STATUS_DELAYED - Task.tasks.create_or_update_from_message(message, status=status) + Task.tasks.create_or_update_from_message(message, status=status, actor_name=message.actor_name, queue_name=message.queue_name) def before_process_message(self, broker, message): from .models import Task LOGGER.debug("Updating Task from message %r.", message.message_id) - Task.tasks.create_or_update_from_message(message, status=Task.STATUS_RUNNING) + Task.tasks.create_or_update_from_message(message, status=Task.STATUS_RUNNING, actor_name=message.actor_name, queue_name=message.queue_name) def after_process_message(self, broker, message, *, result=None, exception=None): from .models import Task @@ -34,7 +34,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None) status = Task.STATUS_FAILED LOGGER.debug("Updating Task from message %r.", message.message_id) - Task.tasks.create_or_update_from_message(message, status=status) + Task.tasks.create_or_update_from_message(message, status=status, actor_name=message.actor_name, queue_name=message.queue_name) class DbConnectionsMiddleware(Middleware): diff --git a/django_dramatiq/migrations/0002_auto_20191104_1354.py b/django_dramatiq/migrations/0002_auto_20191104_1354.py new file mode 100644 index 0000000..017a3a8 --- /dev/null +++ b/django_dramatiq/migrations/0002_auto_20191104_1354.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_dramatiq', '0001_initial'), + ] + + operations = [ + migrations.AlterModelManagers( + name='task', + managers=[ + ], + ), + migrations.AddField( + model_name='task', + name='actor_name', + field=models.CharField(max_length=300, null=True), + ), + migrations.AddField( + model_name='task', + name='queue_name', + field=models.CharField(max_length=100, null=True), + ), + migrations.AlterField( + model_name='task', + name='updated_at', + field=models.DateTimeField(db_index=True, auto_now=True), + ), + ] diff --git a/django_dramatiq/models.py b/django_dramatiq/models.py index f4a0ac2..ed4b569 100644 --- a/django_dramatiq/models.py +++ b/django_dramatiq/models.py @@ -45,9 +45,12 @@ class Task(models.Model): id = models.UUIDField(primary_key=True, editable=False) status = models.CharField(max_length=8, choices=STATUSES, default=STATUS_ENQUEUED) created_at = models.DateTimeField(auto_now_add=True) - updated_at = models.DateTimeField(auto_now=True) + updated_at = models.DateTimeField(auto_now=True, db_index=True) message_data = models.BinaryField() + actor_name = models.CharField(max_length=300, null=True) + queue_name = models.CharField(max_length=100, null=True) + tasks = TaskManager() class Meta: